Browse Source

MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred (Aleksey Gorshkov via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1462527 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 12 years ago
parent
commit
442a5a1639
22 changed files with 1582 additions and 794 deletions
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 25 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
  3. 4 79
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java
  4. 7 476
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueManager.java
  5. 13 129
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
  6. 2 49
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
  7. 38 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClock.java
  8. 155 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
  9. 56 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobInfo.java
  10. 168 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestOldMethodsJobID.java
  11. 183 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java
  12. 61 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestSkipBadRecords.java
  13. 128 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java
  14. 71 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLogAppender.java
  15. 56 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/mapred-queues.xml
  16. 9 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
  17. 66 29
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java
  18. 350 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java
  19. 72 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestQueueConfigurationParser.java
  20. 29 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestStatisticsCollector.java
  21. 19 9
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
  22. 67 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextOutputFormat.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -34,6 +34,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory
     MAPREDUCE-4972. Coverage fixing for org.apache.hadoop.mapreduce.jobhistory
     (Aleksey Gorshkov via bobby)
     (Aleksey Gorshkov via bobby)
 
 
+    MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred
+    (Aleksey Gorshkov via bobby)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the

+ 25 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -32,6 +32,8 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
+import junit.framework.Assert;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -78,7 +80,7 @@ public class TestTaskAttemptListenerImpl {
     }
     }
   }
   }
   
   
-  @Test
+  @Test (timeout=20000)
   public void testGetTask() throws IOException {
   public void testGetTask() throws IOException {
     AppContext appCtx = mock(AppContext.class);
     AppContext appCtx = mock(AppContext.class);
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
@@ -136,9 +138,30 @@ public class TestTaskAttemptListenerImpl {
     assertTrue(result.shouldDie);
     assertTrue(result.shouldDie);
 
 
     listener.stop();
     listener.stop();
+
+    // test JVMID
+    JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
+    assertNotNull(jvmid);
+    try {
+      JVMId.forName("jvm_001_002_m_004_006");
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+          "TaskId string : jvm_001_002_m_004_006 is not properly formed");
+    }
+
   }
   }
 
 
   @Test
   @Test
+  public void testJVMId() {
+
+    JVMId jvmid = new JVMId("test", 1, true, 2);
+    JVMId jvmid1 = JVMId.forName("jvm_test_0001_m_000002");
+    // test compare methot should be the same
+    assertEquals(0, jvmid.compareTo(jvmid1));
+  }
+
+  @Test  (timeout=20000)
   public void testGetMapCompletionEvents() throws IOException {
   public void testGetMapCompletionEvents() throws IOException {
     TaskAttemptCompletionEvent[] empty = {};
     TaskAttemptCompletionEvent[] empty = {};
     TaskAttemptCompletionEvent[] taskEvents = {
     TaskAttemptCompletionEvent[] taskEvents = {
@@ -205,7 +228,7 @@ public class TestTaskAttemptListenerImpl {
     return tce;
     return tce;
   }
   }
 
 
-  @Test
+  @Test  (timeout=2000)
   public void testCommitWindow() throws IOException {
   public void testCommitWindow() throws IOException {
     SystemClock clock = new SystemClock();
     SystemClock clock = new SystemClock();
 
 

+ 4 - 79
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java

@@ -19,8 +19,6 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
@@ -39,62 +37,9 @@ public class JobEndNotifier {
   private static final Log LOG =
   private static final Log LOG =
     LogFactory.getLog(JobEndNotifier.class.getName());
     LogFactory.getLog(JobEndNotifier.class.getName());
 
 
-  private static Thread thread;
-  private static volatile boolean running;
-  private static BlockingQueue<JobEndStatusInfo> queue =
-    new DelayQueue<JobEndStatusInfo>();
-
-  public static void startNotifier() {
-    running = true;
-    thread = new Thread(
-                        new Runnable() {
-                          public void run() {
-                            try {
-                              while (running) {
-                                sendNotification(queue.take());
-                              }
-                            }
-                            catch (InterruptedException irex) {
-                              if (running) {
-                                LOG.error("Thread has ended unexpectedly", irex);
-                              }
-                            }
-                          }
-
-                          private void sendNotification(JobEndStatusInfo notification) {
-                            try {
-                              int code = httpNotification(notification.getUri());
-                              if (code != 200) {
-                                throw new IOException("Invalid response status code: " + code);
-                              }
-                            }
-                            catch (IOException ioex) {
-                              LOG.error("Notification failure [" + notification + "]", ioex);
-                              if (notification.configureForRetry()) {
-                                try {
-                                  queue.put(notification);
-                                }
-                                catch (InterruptedException iex) {
-                                  LOG.error("Notification queuing error [" + notification + "]",
-                                            iex);
-                                }
-                              }
-                            }
-                            catch (Exception ex) {
-                              LOG.error("Notification failure [" + notification + "]", ex);
-                            }
-                          }
-
-                        }
-
-                        );
-    thread.start();
-  }
 
 
-  public static void stopNotifier() {
-    running = false;
-    thread.interrupt();
-  }
+  
+ 
 
 
   private static JobEndStatusInfo createNotification(JobConf conf,
   private static JobEndStatusInfo createNotification(JobConf conf,
                                                      JobStatus status) {
                                                      JobStatus status) {
@@ -118,17 +63,7 @@ public class JobEndNotifier {
     return notification;
     return notification;
   }
   }
 
 
-  public static void registerNotification(JobConf jobConf, JobStatus status) {
-    JobEndStatusInfo notification = createNotification(jobConf, status);
-    if (notification != null) {
-      try {
-        queue.put(notification);
-      }
-      catch (InterruptedException iex) {
-        LOG.error("Notification queuing failure [" + notification + "]", iex);
-      }
-    }
-  }
+  
 
 
   private static int httpNotification(String uri) throws IOException {
   private static int httpNotification(String uri) throws IOException {
     URI url = new URI(uri, false);
     URI url = new URI(uri, false);
@@ -194,10 +129,6 @@ public class JobEndNotifier {
       return retryInterval;
       return retryInterval;
     }
     }
 
 
-    public long getDelayTime() {
-      return delayTime;
-    }
-
     public boolean configureForRetry() {
     public boolean configureForRetry() {
       boolean retry = false;
       boolean retry = false;
       if (getRetryAttempts() > 0) {
       if (getRetryAttempts() > 0) {
@@ -219,13 +150,7 @@ public class JobEndNotifier {
 
 
     @Override
     @Override
     public boolean equals(Object o) {
     public boolean equals(Object o) {
-      if (!(o instanceof JobEndStatusInfo)) {
-        return false;
-      }
-      if (delayTime == ((JobEndStatusInfo)o).delayTime) {
-        return true;
-      }
-      return false;
+      return o instanceof JobEndStatusInfo && delayTime == ((JobEndStatusInfo) o).delayTime;
     }
     }
 
 
     @Override
     @Override

+ 7 - 476
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueManager.java

@@ -24,23 +24,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.QueueState;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.util.StringUtils;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.JsonGenerator;
 
 
 import java.io.BufferedInputStream;
 import java.io.BufferedInputStream;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.io.IOException;
 import java.io.IOException;
-import java.io.Writer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
-import java.util.Set;
-import java.util.List;
 import java.net.URL;
 import java.net.URL;
 
 
 
 
@@ -206,113 +195,8 @@ public class QueueManager {
     LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
     LOG.info("AllQueues : " + allQueues + "; LeafQueues : " + leafQueues);
   }
   }
 
 
-  /**
-   * Return the set of leaf level queues configured in the system to
-   * which jobs are submitted.
-   * <p/>
-   * The number of queues configured should be dependent on the Scheduler
-   * configured. Note that some schedulers work with only one queue, whereas
-   * others can support multiple queues.
-   *
-   * @return Set of queue names.
-   */
-  public synchronized Set<String> getLeafQueueNames() {
-    return leafQueues.keySet();
-  }
-
-  /**
-   * Return true if the given user is part of the ACL for the given
-   * {@link QueueACL} name for the given queue.
-   * <p/>
-   * An operation is allowed if all users are provided access for this
-   * operation, or if either the user or any of the groups specified is
-   * provided access.
-   *
-   * @param queueName Queue on which the operation needs to be performed.
-   * @param qACL      The queue ACL name to be checked
-   * @param ugi       The user and groups who wish to perform the operation.
-   * @return true     if the operation is allowed, false otherwise.
-   */
-  public synchronized boolean hasAccess(
-    String queueName, QueueACL qACL, UserGroupInformation ugi) {
-
-    Queue q = leafQueues.get(queueName);
-
-    if (q == null) {
-      LOG.info("Queue " + queueName + " is not present");
-      return false;
-    }
-
-    if(q.getChildren() != null && !q.getChildren().isEmpty()) {
-      LOG.info("Cannot submit job to parent queue " + q.getName());
-      return false;
-    }
-
-    if (!areAclsEnabled()) {
-      return true;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking access for the acl " + toFullPropertyName(queueName,
-        qACL.getAclName()) + " for user " + ugi.getShortUserName());
-    }
-
-    AccessControlList acl = q.getAcls().get(
-        toFullPropertyName(queueName, qACL.getAclName()));
-    if (acl == null) {
-      return false;
-    }
-
-    // Check if user is part of the ACL
-    return acl.isUserAllowed(ugi);
-  }
-
-  /**
-   * Checks whether the given queue is running or not.
-   *
-   * @param queueName name of the queue
-   * @return true, if the queue is running.
-   */
-  synchronized boolean isRunning(String queueName) {
-    Queue q = leafQueues.get(queueName);
-    if (q != null) {
-      return q.getState().equals(QueueState.RUNNING);
-    }
-    return false;
-  }
-
-  /**
-   * Set a generic Object that represents scheduling information relevant
-   * to a queue.
-   * <p/>
-   * A string representation of this Object will be used by the framework
-   * to display in user facing applications like the JobTracker web UI and
-   * the hadoop CLI.
-   *
-   * @param queueName queue for which the scheduling information is to be set.
-   * @param queueInfo scheduling information for this queue.
-   */
-  public synchronized void setSchedulerInfo(
-    String queueName,
-    Object queueInfo) {
-    if (allQueues.get(queueName) != null) {
-      allQueues.get(queueName).setSchedulingInfo(queueInfo);
-    }
-  }
-
-  /**
-   * Return the scheduler information configured for this queue.
-   *
-   * @param queueName queue for which the scheduling information is required.
-   * @return The scheduling information for this queue.
-   */
-  public synchronized Object getSchedulerInfo(String queueName) {
-    if (allQueues.get(queueName) != null) {
-      return allQueues.get(queueName).getSchedulingInfo();
-    }
-    return null;
-  }
-
+ 
+ 
   static final String MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY =
   static final String MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY =
       "Unable to refresh queues because queue-hierarchy changed. "
       "Unable to refresh queues because queue-hierarchy changed. "
           + "Retaining existing configuration. ";
           + "Retaining existing configuration. ";
@@ -322,73 +206,7 @@ public class QueueManager {
           + " configuration properties. "
           + " configuration properties. "
           + "Retaining existing configuration throughout the system.";
           + "Retaining existing configuration throughout the system.";
 
 
-  /**
-   * Refresh acls, state and scheduler properties for the configured queues.
-   * <p/>
-   * This method reloads configuration related to queues, but does not
-   * support changes to the list of queues or hierarchy. The expected usage
-   * is that an administrator can modify the queue configuration file and
-   * fire an admin command to reload queue configuration. If there is a
-   * problem in reloading configuration, then this method guarantees that
-   * existing queue configuration is untouched and in a consistent state.
-   * 
-   * @param schedulerRefresher
-   * @throws IOException when queue configuration file is invalid.
-   */
-  synchronized void refreshQueues(Configuration conf,
-      QueueRefresher schedulerRefresher)
-      throws IOException {
-
-    // Create a new configuration parser using the passed conf object.
-    QueueConfigurationParser cp =
-        getQueueConfigurationParser(conf, true, areAclsEnabled);
-
-    /*
-     * (1) Validate the refresh of properties owned by QueueManager. As of now,
-     * while refreshing queue properties, we only check that the hierarchy is
-     * the same w.r.t queue names, ACLs and state for each queue and don't
-     * support adding new queues or removing old queues
-     */
-    if (!root.isHierarchySameAs(cp.getRoot())) {
-      LOG.warn(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
-      throw new IOException(MSG_REFRESH_FAILURE_WITH_CHANGE_OF_HIERARCHY);
-    }
-
-    /*
-     * (2) QueueManager owned properties are validated. Now validate and
-     * refresh the properties of scheduler in a single step.
-     */
-    if (schedulerRefresher != null) {
-      try {
-        schedulerRefresher.refreshQueues(cp.getRoot().getJobQueueInfo().getChildren());
-      } catch (Throwable e) {
-        StringBuilder msg =
-            new StringBuilder(
-                "Scheduler's refresh-queues failed with the exception : "
-                    + StringUtils.stringifyException(e));
-        msg.append("\n");
-        msg.append(MSG_REFRESH_FAILURE_WITH_SCHEDULER_FAILURE);
-        LOG.error(msg.toString());
-        throw new IOException(msg.toString());
-      }
-    }
-
-    /*
-     * (3) Scheduler has validated and refreshed its queues successfully, now
-     * refresh the properties owned by QueueManager
-     */
-
-    // First copy the scheduling information recursively into the new
-    // queue-hierarchy. This is done to retain old scheduling information. This
-    // is done after scheduler refresh and not before it because during refresh,
-    // schedulers may wish to change their scheduling info objects too.
-    cp.getRoot().copySchedulingInfo(this.root);
-
-    // Now switch roots.
-    initialize(cp);
-
-    LOG.info("Queue configuration is refreshed successfully.");
-  }
+ 
 
 
   // this method is for internal use only
   // this method is for internal use only
   public static final String toFullPropertyName(
   public static final String toFullPropertyName(
@@ -397,172 +215,10 @@ public class QueueManager {
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
   }
   }
 
 
-  /**
-   * Return an array of {@link JobQueueInfo} objects for all the
-   * queues configurated in the system.
-   *
-   * @return array of JobQueueInfo objects.
-   */
-  synchronized JobQueueInfo[] getJobQueueInfos() {
-    ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
-    for (String queue : allQueues.keySet()) {
-      JobQueueInfo queueInfo = getJobQueueInfo(queue);
-      if (queueInfo != null) {
-        queueInfoList.add(queueInfo);
-      }
-    }
-    return queueInfoList.toArray(
-      new JobQueueInfo[queueInfoList.size()]);
-  }
-
-
-  /**
-   * Return {@link JobQueueInfo} for a given queue.
-   *
-   * @param queue name of the queue
-   * @return JobQueueInfo for the queue, null if the queue is not found.
-   */
-  synchronized JobQueueInfo getJobQueueInfo(String queue) {
-    if (allQueues.containsKey(queue)) {
-      return allQueues.get(queue).getJobQueueInfo();
-    }
-
-    return null;
-  }
-
-  /**
-   * JobQueueInfo for all the queues.
-   * <p/>
-   * Contribs can use this data structure to either create a hierarchy or for
-   * traversing.
-   * They can also use this to refresh properties in case of refreshQueues
-   *
-   * @return a map for easy navigation.
-   */
-  synchronized Map<String, JobQueueInfo> getJobQueueInfoMapping() {
-    Map<String, JobQueueInfo> m = new HashMap<String, JobQueueInfo>();
-
-    for (String key : allQueues.keySet()) {
-      m.put(key, allQueues.get(key).getJobQueueInfo());
-    }
-
-    return m;
-  }
-
-  /**
-   * Generates the array of QueueAclsInfo object.
-   * <p/>
-   * The array consists of only those queues for which user has acls.
-   *
-   * @return QueueAclsInfo[]
-   * @throws java.io.IOException
-   */
-  synchronized QueueAclsInfo[] getQueueAcls(UserGroupInformation ugi)
-    throws IOException {
-    //List of all QueueAclsInfo objects , this list is returned
-    ArrayList<QueueAclsInfo> queueAclsInfolist =
-      new ArrayList<QueueAclsInfo>();
-    QueueACL[] qAcls = QueueACL.values();
-    for (String queueName : leafQueues.keySet()) {
-      QueueAclsInfo queueAclsInfo = null;
-      ArrayList<String> operationsAllowed = null;
-      for (QueueACL qAcl : qAcls) {
-        if (hasAccess(queueName, qAcl, ugi)) {
-          if (operationsAllowed == null) {
-            operationsAllowed = new ArrayList<String>();
-          }
-          operationsAllowed.add(qAcl.getAclName());
-        }
-      }
-      if (operationsAllowed != null) {
-        //There is atleast 1 operation supported for queue <queueName>
-        //, hence initialize queueAclsInfo
-        queueAclsInfo = new QueueAclsInfo(
-          queueName, operationsAllowed.toArray
-            (new String[operationsAllowed.size()]));
-        queueAclsInfolist.add(queueAclsInfo);
-      }
-    }
-    return queueAclsInfolist.toArray(
-      new QueueAclsInfo[queueAclsInfolist.size()]);
-  }
-
-  /**
-   * ONLY FOR TESTING - Do not use in production code.
-   * This method is used for setting up of leafQueues only.
-   * We are not setting the hierarchy here.
-   *
-   * @param queues
-   */
-  synchronized void setQueues(Queue[] queues) {
-    root.getChildren().clear();
-    leafQueues.clear();
-    allQueues.clear();
-
-    for (Queue queue : queues) {
-      root.addChild(queue);
-    }
-    //At this point we have root populated
-    //update data structures leafNodes.
-    leafQueues = getRoot().getLeafQueues();
-    allQueues.putAll(getRoot().getInnerQueues());
-    allQueues.putAll(leafQueues);
-  }
-
-  /**
-   * Return an array of {@link JobQueueInfo} objects for the root
-   * queues configured in the system.
-   * <p/>
-   * Root queues are queues that are at the top-most level in the
-   * hierarchy of queues in mapred-queues.xml, or they are the queues
-   * configured in the mapred.queue.names key in mapred-site.xml.
-   *
-   * @return array of JobQueueInfo objects for root level queues.
-   */
-
-  JobQueueInfo[] getRootQueues() {
-    List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
-    return list.toArray(new JobQueueInfo[list.size()]);
-  }
-
-  /**
-   * Get the complete hierarchy of children for queue
-   * queueName
-   *
-   * @param queueName
-   * @return
-   */
-  JobQueueInfo[] getChildQueues(String queueName) {
-    List<JobQueueInfo> list =
-      allQueues.get(queueName).getJobQueueInfo().getChildren();
-    if (list != null) {
-      return list.toArray(new JobQueueInfo[list.size()]);
-    } else {
-      return new JobQueueInfo[0];
-    }
-  }
-
-  /**
-   * Used only for testing purposes .
-   * This method is unstable as refreshQueues would leave this
-   * data structure in unstable state.
-   *
-   * @param queueName
-   * @return
-   */
-  Queue getQueue(String queueName) {
-    return this.allQueues.get(queueName);
-  }
-
-
-  /**
-   * Return if ACLs are enabled for the Map/Reduce system
-   *
-   * @return true if ACLs are enabled.
-   */
-  boolean areAclsEnabled() {
-    return areAclsEnabled;
-  }
+  
+ 
+ 
+ 
 
 
   /**
   /**
    * Used only for test.
    * Used only for test.
@@ -573,131 +229,6 @@ public class QueueManager {
     return root;
     return root;
   }
   }
 
 
-  /**
-   * Returns the specific queue ACL for the given queue.
-   * Returns null if the given queue does not exist or the acl is not
-   * configured for that queue.
-   * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
-   * ACL with all users.
-   */
-  synchronized AccessControlList getQueueACL(String queueName,
-      QueueACL qACL) {
-    if (areAclsEnabled) {
-      Queue q = leafQueues.get(queueName);
-      if (q != null) {
-        return q.getAcls().get(toFullPropertyName(
-          queueName, qACL.getAclName()));
-      }
-      else {
-        LOG.warn("Queue " + queueName + " is not present.");
-        return null;
-      }
-    }
-    return new AccessControlList("*");
-  }
-
-  /**
-   * Dumps the configuration of hierarchy of queues
-   * @param out the writer object to which dump is written
-   * @throws IOException
-   */
-  static void dumpConfiguration(Writer out,Configuration conf) throws IOException {
-    dumpConfiguration(out, null,conf);
-  }
   
   
-  /***
-   * Dumps the configuration of hierarchy of queues with 
-   * the xml file path given. It is to be used directly ONLY FOR TESTING.
-   * @param out the writer object to which dump is written to.
-   * @param configFile the filename of xml file
-   * @throws IOException
-   */
-  static void dumpConfiguration(Writer out, String configFile,
-      Configuration conf) throws IOException {
-    if (conf != null && conf.get(DeprecatedQueueConfigurationParser.
-        MAPRED_QUEUE_NAMES_KEY) != null) {
-      return;
-    }
-    
-    JsonFactory dumpFactory = new JsonFactory();
-    JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
-    QueueConfigurationParser parser;
-    boolean aclsEnabled = false;
-    if (conf != null) {
-      aclsEnabled = conf.getBoolean(MRConfig.MR_ACLS_ENABLED, false);
-    }
-    if (configFile != null && !"".equals(configFile)) {
-      parser = new QueueConfigurationParser(configFile, aclsEnabled);
-    }
-    else {
-      parser = getQueueConfigurationParser(null, false, aclsEnabled);
-    }
-    dumpGenerator.writeStartObject();
-    dumpGenerator.writeFieldName("queues");
-    dumpGenerator.writeStartArray();
-    dumpConfiguration(dumpGenerator,parser.getRoot().getChildren());
-    dumpGenerator.writeEndArray();
-    dumpGenerator.writeEndObject();
-    dumpGenerator.flush();
-  }
-
-  /**
-   * method to perform depth-first search and write the parameters of every 
-   * queue in JSON format.
-   * @param dumpGenerator JsonGenerator object which takes the dump and flushes
-   *  to a writer object
-   * @param rootQueues the top-level queues
-   * @throws JsonGenerationException
-   * @throws IOException
-   */
-  private static void dumpConfiguration(JsonGenerator dumpGenerator,
-      Set<Queue> rootQueues) throws JsonGenerationException, IOException {
-    for (Queue queue : rootQueues) {
-      dumpGenerator.writeStartObject();
-      dumpGenerator.writeStringField("name", queue.getName());
-      dumpGenerator.writeStringField("state", queue.getState().toString());
-      AccessControlList submitJobList = null;
-      AccessControlList administerJobsList = null;
-      if (queue.getAcls() != null) {
-        submitJobList =
-          queue.getAcls().get(toFullPropertyName(queue.getName(),
-              QueueACL.SUBMIT_JOB.getAclName()));
-        administerJobsList =
-          queue.getAcls().get(toFullPropertyName(queue.getName(),
-              QueueACL.ADMINISTER_JOBS.getAclName()));
-      }
-      String aclsSubmitJobValue = " ";
-      if (submitJobList != null ) {
-        aclsSubmitJobValue = submitJobList.getAclString();
-      }
-      dumpGenerator.writeStringField("acl_submit_job", aclsSubmitJobValue);
-      String aclsAdministerValue = " ";
-      if (administerJobsList != null) {
-        aclsAdministerValue = administerJobsList.getAclString();
-      }
-      dumpGenerator.writeStringField("acl_administer_jobs",
-          aclsAdministerValue);
-      dumpGenerator.writeFieldName("properties");
-      dumpGenerator.writeStartArray();
-      if (queue.getProperties() != null) {
-        for (Map.Entry<Object, Object>property :
-          queue.getProperties().entrySet()) {
-          dumpGenerator.writeStartObject();
-          dumpGenerator.writeStringField("key", (String)property.getKey());
-          dumpGenerator.writeStringField("value", (String)property.getValue());
-          dumpGenerator.writeEndObject();
-        }
-      }
-      dumpGenerator.writeEndArray();
-      Set<Queue> childQueues = queue.getChildren();
-      dumpGenerator.writeFieldName("children");
-      dumpGenerator.writeStartArray();
-      if (childQueues != null && childQueues.size() > 0) {
-        dumpConfiguration(dumpGenerator, childQueues);
-      }
-      dumpGenerator.writeEndArray();
-      dumpGenerator.writeEndObject();
-    }
-  }
 
 
 }
 }

+ 13 - 129
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java

@@ -119,7 +119,7 @@ public class TaskLog {
     //stderr:<start-offset in the stderr file> <length>
     //stderr:<start-offset in the stderr file> <length>
     //syslog:<start-offset in the syslog file> <length>
     //syslog:<start-offset in the syslog file> <length>
     LogFileDetail l = new LogFileDetail();
     LogFileDetail l = new LogFileDetail();
-    String str = null;
+    String str;
     try {
     try {
       str = fis.readLine();
       str = fis.readLine();
       if (str == null) { // the file doesn't have anything
       if (str == null) { // the file doesn't have anything
@@ -199,7 +199,7 @@ public class TaskLog {
 
 
     BufferedOutputStream bos = 
     BufferedOutputStream bos = 
       new BufferedOutputStream(
       new BufferedOutputStream(
-        SecureIOUtils.createForWrite(tmpIndexFile, 0644));
+        SecureIOUtils.createForWrite(tmpIndexFile, 644));
     DataOutputStream dos = new DataOutputStream(bos);
     DataOutputStream dos = new DataOutputStream(bos);
     //the format of the index file is
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -391,74 +391,7 @@ public class TaskLog {
     return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
     return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
   }
   }
 
 
-  /**
-   * Wrap a command in a shell to capture stdout and stderr to files.
-   * If the tailLength is 0, the entire output will be saved.
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @return the modified command that should be run
-   */
-  public static List<String> captureOutAndError(List<String> cmd, 
-                                                File stdoutFilename,
-                                                File stderrFilename,
-                                                long tailLength
-                                               ) throws IOException {
-    return captureOutAndError(null, cmd, stdoutFilename,
-                              stderrFilename, tailLength, false);
-  }
-
-  /**
-   * Wrap a command in a shell to capture stdout and stderr to files.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * If the tailLength is 0, the entire output will be saved.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @return the modified command that should be run
-   */
-  public static List<String> captureOutAndError(List<String> setup,
-                                                List<String> cmd, 
-                                                File stdoutFilename,
-                                                File stderrFilename,
-                                                long tailLength
-                                               ) throws IOException {
-    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-                              tailLength, false);
-  }
 
 
-  /**
-   * Wrap a command in a shell to capture stdout and stderr to files.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * If the tailLength is 0, the entire output will be saved.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @param pidFileName The name of the pid-file. pid-file's usage is deprecated
-   * @return the modified command that should be run
-   * 
-   * @deprecated     pidFiles are no more used. Instead pid is exported to
-   *                 env variable JVM_PID.
-   */
-  @Deprecated
-  public static List<String> captureOutAndError(List<String> setup,
-                                                List<String> cmd, 
-                                                File stdoutFilename,
-                                                File stderrFilename,
-                                                long tailLength,
-                                                String pidFileName
-                                               ) throws IOException {
-    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-        tailLength, false);
-  }
-  
   /**
   /**
    * Wrap a command in a shell to capture stdout and stderr to files.
    * Wrap a command in a shell to capture stdout and stderr to files.
    * Setup commands such as setting memory limit can be passed which 
    * Setup commands such as setting memory limit can be passed which 
@@ -506,10 +439,10 @@ public class TaskLog {
                                       boolean useSetsid)
                                       boolean useSetsid)
                                 throws IOException {
                                 throws IOException {
     
     
-    String stdout = FileUtil.makeShellPath(stdoutFilename);
-    String stderr = FileUtil.makeShellPath(stderrFilename);    
-    StringBuffer mergedCmd = new StringBuffer();
-    
+    String stdOut = FileUtil.makeShellPath(stdoutFilename);
+    String stdErr = FileUtil.makeShellPath(stderrFilename);
+    StringBuffer mergedCmd= new StringBuffer();
+
     // Export the pid of taskJvm to env variable JVM_PID.
     // Export the pid of taskJvm to env variable JVM_PID.
     // Currently pid is not used on Windows
     // Currently pid is not used on Windows
     if (!Shell.WINDOWS) {
     if (!Shell.WINDOWS) {
@@ -536,54 +469,24 @@ public class TaskLog {
       mergedCmd.append(" -c ");
       mergedCmd.append(" -c ");
       mergedCmd.append(tailLength);
       mergedCmd.append(tailLength);
       mergedCmd.append(" >> ");
       mergedCmd.append(" >> ");
-      mergedCmd.append(stdout);
+      mergedCmd.append(stdOut);
       mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
       mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
       mergedCmd.append(tailCommand);
       mergedCmd.append(tailCommand);
       mergedCmd.append(" -c ");
       mergedCmd.append(" -c ");
       mergedCmd.append(tailLength);
       mergedCmd.append(tailLength);
       mergedCmd.append(" >> ");
       mergedCmd.append(" >> ");
-      mergedCmd.append(stderr);
+      mergedCmd.append(stdErr);
       mergedCmd.append(" ; exit $PIPESTATUS");
       mergedCmd.append(" ; exit $PIPESTATUS");
     } else {
     } else {
       mergedCmd.append(" 1>> ");
       mergedCmd.append(" 1>> ");
-      mergedCmd.append(stdout);
+      mergedCmd.append(stdOut);
       mergedCmd.append(" 2>> ");
       mergedCmd.append(" 2>> ");
-      mergedCmd.append(stderr);
+      mergedCmd.append(stdErr);
     }
     }
     return mergedCmd.toString();
     return mergedCmd.toString();
   }
   }
   
   
-  /**
-   * Construct the command line for running the debug script
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @return the command line as a String
-   * @throws IOException
-   */
-  static String buildDebugScriptCommandLine(List<String> cmd, String debugout)
-  throws IOException {
-    StringBuilder mergedCmd = new StringBuilder();
-    mergedCmd.append("exec ");
-    boolean isExecutable = true;
-    for(String s: cmd) {
-      if (isExecutable) {
-        // the executable name needs to be expressed as a shell path for the  
-        // shell to find it.
-        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
-        isExecutable = false; 
-      } else {
-        mergedCmd.append(s);
-      }
-      mergedCmd.append(" ");
-    }
-    mergedCmd.append(" < /dev/null ");
-    mergedCmd.append(" >");
-    mergedCmd.append(debugout);
-    mergedCmd.append(" 2>&1 ");
-    return mergedCmd.toString();
-  }
+
   /**
   /**
    * Add quotes to each of the command strings and
    * Add quotes to each of the command strings and
    * return as a single string 
    * return as a single string 
@@ -595,7 +498,7 @@ public class TaskLog {
    */
    */
   public static String addCommand(List<String> cmd, boolean isExecutable) 
   public static String addCommand(List<String> cmd, boolean isExecutable) 
   throws IOException {
   throws IOException {
-    StringBuffer command = new StringBuffer();
+    StringBuilder command = new StringBuilder();
     for(String s: cmd) {
     for(String s: cmd) {
     	command.append('\'');
     	command.append('\'');
       if (isExecutable) {
       if (isExecutable) {
@@ -612,25 +515,7 @@ public class TaskLog {
     return command.toString();
     return command.toString();
   }
   }
   
   
-  /**
-   * Wrap a command in a shell to capture debug script's 
-   * stdout and stderr to debugout.
-   * @param cmd The command and the arguments that should be run
-   * @param debugoutFilename The filename that stdout and stderr
-   *  should be saved to.
-   * @return the modified command that should be run
-   * @throws IOException
-   */
-  public static List<String> captureDebugOut(List<String> cmd, 
-                                             File debugoutFilename
-                                            ) throws IOException {
-    String debugout = FileUtil.makeShellPath(debugoutFilename);
-    List<String> result = new ArrayList<String>(3);
-    result.add(bashCommand);
-    result.add("-c");
-    result.add(buildDebugScriptCommandLine(cmd, debugout));
-    return result;
-  }
+  
   
   
   /**
   /**
    * Method to return the location of user log directory.
    * Method to return the location of user log directory.
@@ -644,7 +529,6 @@ public class TaskLog {
   /**
   /**
    * Get the user log directory for the job jobid.
    * Get the user log directory for the job jobid.
    * 
    * 
-   * @param jobid
    * @return user log directory for the job
    * @return user log directory for the job
    */
    */
   public static File getJobDir(JobID jobid) {
   public static File getJobDir(JobID jobid) {

+ 2 - 49
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java

@@ -58,7 +58,6 @@ public abstract class TaskStatus implements Writable, Cloneable {
   private volatile State runState;
   private volatile State runState;
   private String diagnosticInfo;
   private String diagnosticInfo;
   private String stateString;
   private String stateString;
-  private String taskTracker;
   private int numSlots;
   private int numSlots;
     
     
   private long startTime; //in ms
   private long startTime; //in ms
@@ -98,7 +97,6 @@ public abstract class TaskStatus implements Writable, Cloneable {
     this.runState = runState;
     this.runState = runState;
     setDiagnosticInfo(diagnosticInfo);
     setDiagnosticInfo(diagnosticInfo);
     setStateString(stateString);
     setStateString(stateString);
-    this.taskTracker = taskTracker;
     this.phase = phase;
     this.phase = phase;
     this.counters = counters;
     this.counters = counters;
     this.includeAllCounters = true;
     this.includeAllCounters = true;
@@ -106,17 +104,12 @@ public abstract class TaskStatus implements Writable, Cloneable {
   
   
   public TaskAttemptID getTaskID() { return taskid; }
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
   public abstract boolean getIsMap();
-  public int getNumSlots() {
-    return numSlots;
-  }
 
 
   public float getProgress() { return progress; }
   public float getProgress() { return progress; }
   public void setProgress(float progress) {
   public void setProgress(float progress) {
     this.progress = progress;
     this.progress = progress;
   } 
   } 
   public State getRunState() { return runState; }
   public State getRunState() { return runState; }
-  public String getTaskTracker() {return taskTracker;}
-  public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
   public void setRunState(State runState) { this.runState = runState; }
   public void setRunState(State runState) { this.runState = runState; }
   public String getDiagnosticInfo() { return diagnosticInfo; }
   public String getDiagnosticInfo() { return diagnosticInfo; }
   public void setDiagnosticInfo(String info) {
   public void setDiagnosticInfo(String info) {
@@ -163,7 +156,6 @@ public abstract class TaskStatus implements Writable, Cloneable {
 
 
   /**
   /**
    * Set the next record range which is going to be processed by Task.
    * Set the next record range which is going to be processed by Task.
-   * @param nextRecordRange
    */
    */
   public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
   public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
     this.nextRecordRange = nextRecordRange;
     this.nextRecordRange = nextRecordRange;
@@ -212,7 +204,6 @@ public abstract class TaskStatus implements Writable, Cloneable {
 
 
   /**
   /**
    * Set shuffle finish time. 
    * Set shuffle finish time. 
-   * @param shuffleFinishTime 
    */
    */
   void setShuffleFinishTime(long shuffleFinishTime) {}
   void setShuffleFinishTime(long shuffleFinishTime) {}
 
 
@@ -230,7 +221,7 @@ public abstract class TaskStatus implements Writable, Cloneable {
   
   
   /**
   /**
    * Set map phase finish time. 
    * Set map phase finish time. 
-   * @param mapFinishTime 
+   * @param mapFinishTime
    */
    */
   void setMapFinishTime(long mapFinishTime) {}
   void setMapFinishTime(long mapFinishTime) {}
 
 
@@ -306,21 +297,6 @@ public abstract class TaskStatus implements Writable, Cloneable {
     }
     }
   }
   }
 
 
-  boolean inTaskCleanupPhase() {
-    return (this.phase == TaskStatus.Phase.CLEANUP && 
-      (this.runState == TaskStatus.State.FAILED_UNCLEAN || 
-      this.runState == TaskStatus.State.KILLED_UNCLEAN));
-  }
-  
-  public boolean getIncludeAllCounters() {
-    return includeAllCounters;
-  }
-  
-  public void setIncludeAllCounters(boolean send) {
-    includeAllCounters = send;
-    counters.setWriteAllCounters(send);
-  }
-  
   /**
   /**
    * Get task's counters.
    * Get task's counters.
    */
    */
@@ -494,18 +470,7 @@ public abstract class TaskStatus implements Writable, Cloneable {
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   //////////////////////////////////////////////////////////////////////////////
   //////////////////////////////////////////////////////////////////////////////
   
   
-  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, 
-                                     float progress, int numSlots,
-                                     State runState, String diagnosticInfo,
-                                     String stateString, String taskTracker,
-                                     Phase phase, Counters counters) 
-  throws IOException {
-    boolean isMap = in.readBoolean();
-    return createTaskStatus(isMap, taskId, progress, numSlots, runState, 
-                            diagnosticInfo, stateString, taskTracker, phase, 
-                            counters);
-  }
-  
+
   static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, 
   static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, 
                                      float progress, int numSlots,
                                      float progress, int numSlots,
                                      State runState, String diagnosticInfo,
                                      State runState, String diagnosticInfo,
@@ -523,17 +488,5 @@ public abstract class TaskStatus implements Writable, Cloneable {
     return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
     return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
   }
   }
 
 
-  static TaskStatus readTaskStatus(DataInput in) throws IOException {
-    boolean isMap = in.readBoolean();
-    TaskStatus taskStatus = createTaskStatus(isMap);
-    taskStatus.readFields(in);
-    return taskStatus;
-  }
-  
-  static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
-  throws IOException {
-    out.writeBoolean(taskStatus.getIsMap());
-    taskStatus.write(out);
-  }
 }
 }
 
 

+ 38 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClock.java

@@ -0,0 +1,38 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+/**
+ *  test Clock classs
+ *
+ */
+public class TestClock {
+
+  @Test  (timeout=2000)
+  public void testClock(){
+    Clock clock= new Clock();
+    long templateTime=System.currentTimeMillis();
+    long time=clock.getTime();
+    assertEquals(templateTime, time,30);
+
+  }
+}

+ 155 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java

@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.regex.Pattern;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * test JobConf
+ * 
+ */
+public class TestJobConf {
+
+  /**
+   * test getters and setters of JobConf
+   */
+  @SuppressWarnings("deprecation")
+  @Test  (timeout=5000)
+  public void testJobConf() {
+    JobConf conf = new JobConf();
+    // test default value
+    Pattern pattern = conf.getJarUnpackPattern();
+    assertEquals(Pattern.compile("(?:classes/|lib/).*").toString(),
+        pattern.toString());
+    // default value
+    assertFalse(conf.getKeepFailedTaskFiles());
+    conf.setKeepFailedTaskFiles(true);
+    assertTrue(conf.getKeepFailedTaskFiles());
+
+    // default value
+    assertNull(conf.getKeepTaskFilesPattern());
+    conf.setKeepTaskFilesPattern("123454");
+    assertEquals("123454", conf.getKeepTaskFilesPattern());
+
+    // default value
+    assertNotNull(conf.getWorkingDirectory());
+    conf.setWorkingDirectory(new Path("test"));
+    assertTrue(conf.getWorkingDirectory().toString().endsWith("test"));
+
+    // default value
+    assertEquals(1, conf.getNumTasksToExecutePerJvm());
+
+    // default value
+    assertNull(conf.getKeyFieldComparatorOption());
+    conf.setKeyFieldComparatorOptions("keySpec");
+    assertEquals("keySpec", conf.getKeyFieldComparatorOption());
+
+    // default value
+    assertFalse(conf.getUseNewReducer());
+    conf.setUseNewReducer(true);
+    assertTrue(conf.getUseNewReducer());
+
+    // default
+    assertTrue(conf.getMapSpeculativeExecution());
+    assertTrue(conf.getReduceSpeculativeExecution());
+    assertTrue(conf.getSpeculativeExecution());
+    conf.setReduceSpeculativeExecution(false);
+    assertTrue(conf.getSpeculativeExecution());
+
+    conf.setMapSpeculativeExecution(false);
+    assertFalse(conf.getSpeculativeExecution());
+    assertFalse(conf.getMapSpeculativeExecution());
+    assertFalse(conf.getReduceSpeculativeExecution());
+
+    conf.setSessionId("ses");
+    assertEquals("ses", conf.getSessionId());
+
+    assertEquals(3, conf.getMaxTaskFailuresPerTracker());
+    conf.setMaxTaskFailuresPerTracker(2);
+    assertEquals(2, conf.getMaxTaskFailuresPerTracker());
+
+    assertEquals(0, conf.getMaxMapTaskFailuresPercent());
+    conf.setMaxMapTaskFailuresPercent(50);
+    assertEquals(50, conf.getMaxMapTaskFailuresPercent());
+
+    assertEquals(0, conf.getMaxReduceTaskFailuresPercent());
+    conf.setMaxReduceTaskFailuresPercent(70);
+    assertEquals(70, conf.getMaxReduceTaskFailuresPercent());
+
+    // by default
+    assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
+    conf.setJobPriority(JobPriority.HIGH);
+    assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
+
+    assertNull(conf.getJobSubmitHostName());
+    conf.setJobSubmitHostName("hostname");
+    assertEquals("hostname", conf.getJobSubmitHostName());
+
+    // default
+    assertNull(conf.getJobSubmitHostAddress());
+    conf.setJobSubmitHostAddress("ww");
+    assertEquals("ww", conf.getJobSubmitHostAddress());
+
+    // default value
+    assertFalse(conf.getProfileEnabled());
+    conf.setProfileEnabled(true);
+    assertTrue(conf.getProfileEnabled());
+
+    // default value
+    assertEquals(conf.getProfileTaskRange(true).toString(), "0-2");
+    assertEquals(conf.getProfileTaskRange(false).toString(), "0-2");
+    conf.setProfileTaskRange(true, "0-3");
+    assertEquals(conf.getProfileTaskRange(false).toString(), "0-2");
+    assertEquals(conf.getProfileTaskRange(true).toString(), "0-3");
+
+    // default value
+    assertNull(conf.getMapDebugScript());
+    conf.setMapDebugScript("mDbgScript");
+    assertEquals("mDbgScript", conf.getMapDebugScript());
+
+    // default value
+    assertNull(conf.getReduceDebugScript());
+    conf.setReduceDebugScript("rDbgScript");
+    assertEquals("rDbgScript", conf.getReduceDebugScript());
+
+    // default value
+    assertNull(conf.getJobLocalDir());
+
+    assertEquals("default", conf.getQueueName());
+    conf.setQueueName("qname");
+    assertEquals("qname", conf.getQueueName());
+
+    assertEquals(1, conf.computeNumSlotsPerMap(100L));
+    assertEquals(1, conf.computeNumSlotsPerReduce(100L));
+
+    conf.setMemoryForMapTask(100 * 1000);
+    assertEquals(1000, conf.computeNumSlotsPerMap(100L));
+    conf.setMemoryForReduceTask(1000 * 1000);
+    assertEquals(1000, conf.computeNumSlotsPerReduce(1000L));
+
+    assertEquals(-1, conf.getMaxPhysicalMemoryForTask());
+    assertEquals("The variable key is no longer used.",
+        JobConf.deprecatedString("key"));
+
+  }
+}

+ 56 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobInfo.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * test class JobInfo
+ * 
+ * 
+ */
+public class TestJobInfo {
+  @Test  (timeout=1000)
+  public void testJobInfo() throws IOException {
+    JobID jid = new JobID("001", 1);
+    Text user = new Text("User");
+    Path path = new Path("/tmp/test");
+    JobInfo info = new JobInfo(jid, user, path);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    info.write(new DataOutputStream(out));
+
+    JobInfo copyinfo = new JobInfo();
+    copyinfo.readFields(new DataInputStream(new ByteArrayInputStream(out
+        .toByteArray())));
+    assertEquals(info.getJobID().toString(), copyinfo.getJobID().toString());
+    assertEquals(info.getJobSubmitDir().getName(), copyinfo.getJobSubmitDir()
+        .getName());
+    assertEquals(info.getUser().toString(), copyinfo.getUser().toString());
+
+  }
+}

+ 168 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestOldMethodsJobID.java

@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskCompletionEvent.Status;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test deprcated methods
+ *
+ */
+public class TestOldMethodsJobID {
+
+  /**
+   * test deprecated methods of TaskID
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  @Test  (timeout=1000)
+  public void testDepricatedMethods() throws IOException {
+    JobID jid = new JobID();
+    TaskID test = new TaskID(jid, true, 1);
+    assertEquals(test.getTaskType(), TaskType.MAP);
+    test = new TaskID(jid, false, 1);
+    assertEquals(test.getTaskType(), TaskType.REDUCE);
+
+    test = new TaskID("001", 1, false, 1);
+    assertEquals(test.getTaskType(), TaskType.REDUCE);
+    test = new TaskID("001", 1, true, 1);
+    assertEquals(test.getTaskType(), TaskType.MAP);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    test.write(new DataOutputStream(out));
+    TaskID ti = TaskID.read(new DataInputStream(new ByteArrayInputStream(out
+        .toByteArray())));
+    assertEquals(ti.toString(), test.toString());
+    assertEquals("task_001_0001_m_000002",
+        TaskID.getTaskIDsPattern("001", 1, true, 2));
+    assertEquals("task_003_0001_m_000004",
+        TaskID.getTaskIDsPattern("003", 1, TaskType.MAP, 4));
+    assertEquals("003_0001_m_000004",
+        TaskID.getTaskIDsPatternWOPrefix("003", 1, TaskType.MAP, 4).toString());
+   
+  }
+  
+  /**
+   * test JobID
+   * @throws IOException 
+   */
+  @SuppressWarnings("deprecation")
+  @Test (timeout=1000)
+  public void testJobID() throws IOException{
+    JobID jid = new JobID("001",2);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    jid.write(new DataOutputStream(out));
+   assertEquals(jid,JobID.read(new DataInputStream(new ByteArrayInputStream(out.toByteArray()))));
+   assertEquals("job_001_0001",JobID.getJobIDsPattern("001",1));
+  }
+  /**
+   * test deprecated methods of TaskCompletionEvent
+   */
+  @SuppressWarnings("deprecation")
+  @Test (timeout=1000)
+  public void testTaskCompletionEvent() {
+    TaskAttemptID taid = new TaskAttemptID("001", 1, TaskType.REDUCE, 2, 3);
+    TaskCompletionEvent template = new TaskCompletionEvent(12, taid, 13, true,
+        Status.SUCCEEDED, "httptracker");
+    TaskCompletionEvent testEl = TaskCompletionEvent.downgrade(template);
+    testEl.setTaskAttemptId(taid);
+    testEl.setTaskTrackerHttp("httpTracker");
+
+    testEl.setTaskId("attempt_001_0001_m_000002_04");
+    assertEquals("attempt_001_0001_m_000002_4",testEl.getTaskId());
+
+    testEl.setTaskStatus(Status.OBSOLETE);
+    assertEquals(Status.OBSOLETE.toString(), testEl.getStatus().toString());
+
+    testEl.setTaskRunTime(20);
+    assertEquals(testEl.getTaskRunTime(), 20);
+    testEl.setEventId(16);
+    assertEquals(testEl.getEventId(), 16);
+
+  }
+
+  /**
+   * test depricated methods of JobProfile
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  @Test  (timeout=1000)
+  public void testJobProfile() throws IOException {
+
+    JobProfile profile = new JobProfile("user", "job_001_03", "jobFile", "uri",
+        "name");
+    assertEquals("job_001_0003", profile.getJobId());
+    assertEquals("default", profile.getQueueName());
+   // serialization test
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    profile.write(new DataOutputStream(out));
+
+    JobProfile profile2 = new JobProfile();
+    profile2.readFields(new DataInputStream(new ByteArrayInputStream(out
+        .toByteArray())));
+
+    assertEquals(profile2.name, profile.name);
+    assertEquals(profile2.jobFile, profile.jobFile);
+    assertEquals(profile2.queueName, profile.queueName);
+    assertEquals(profile2.url, profile.url);
+    assertEquals(profile2.user, profile.user);
+  }
+  /**
+   * test TaskAttemptID 
+   */
+  @SuppressWarnings( "deprecation" )
+  @Test  (timeout=1000)
+  public void testTaskAttemptID (){
+    TaskAttemptID task  = new TaskAttemptID("001",2,true,3,4);
+    assertEquals("attempt_001_0002_m_000003_4", TaskAttemptID.getTaskAttemptIDsPattern("001", 2, true, 3, 4));
+    assertEquals("task_001_0002_m_000003", task.getTaskID().toString());
+    assertEquals("attempt_001_0001_r_000002_3",TaskAttemptID.getTaskAttemptIDsPattern("001", 1, TaskType.REDUCE, 2, 3));
+    assertEquals("001_0001_m_000001_2", TaskAttemptID.getTaskAttemptIDsPatternWOPrefix("001",1, TaskType.MAP, 1, 2).toString());
+    
+  }
+  
+  /**
+   * test Reporter.NULL
+   * 
+   */
+  
+  @Test  (timeout=1000)
+  public void testReporter(){
+    Reporter nullReporter=Reporter.NULL;
+    assertNull(nullReporter.getCounter(null));
+    assertNull(nullReporter.getCounter("group", "name"));
+    // getInputSplit method removed
+    try{
+      assertNull(nullReporter.getInputSplit());
+    }catch(UnsupportedOperationException e){
+      assertEquals( "NULL reporter has no input",e.getMessage());
+    }
+    assertEquals(0,nullReporter.getProgress(),0.01);
+
+  }
+}

+ 183 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java

@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * TestCounters checks the sanity and recoverability of Queue
+ */
+public class TestQueue {
+
+  /**
+   * test QueueManager
+   * configuration from file
+   * 
+   * @throws IOException
+   */
+  @Test  (timeout=20000)
+  public void testQueue() throws IOException {
+    File f = null;
+    try {
+      f = writeFile();
+
+      QueueManager manager = new QueueManager(f.getCanonicalPath(), true);
+      Queue root = manager.getRoot();
+      assertTrue(root.getChildren().size() == 2);
+      Iterator<Queue> iterator = root.getChildren().iterator();
+      Queue firstSubQueue = iterator.next();
+      assertTrue(firstSubQueue.getName().equals("first"));
+      assertEquals(
+          firstSubQueue.getAcls().get("mapred.queue.first.acl-submit-job")
+              .toString(),
+          "Users [user1, user2] and members of the groups [group1, group2] are allowed");
+      Queue secondSubQueue = iterator.next();
+      assertTrue(secondSubQueue.getName().equals("second"));
+      assertEquals(secondSubQueue.getProperties().getProperty("key"), "value");
+      assertEquals(secondSubQueue.getProperties().getProperty("key1"), "value1");
+      // test status
+      assertEquals(firstSubQueue.getState().getStateName(), "running");
+      assertEquals(secondSubQueue.getState().getStateName(), "stopped");
+
+      Set<String> template = new HashSet<String>();
+      template.add("first");
+      template.add("second");
+
+      // test user access
+
+      UserGroupInformation mockUGI = mock(UserGroupInformation.class);
+      when(mockUGI.getShortUserName()).thenReturn("user1");
+      String[] groups = { "group1" };
+      when(mockUGI.getGroupNames()).thenReturn(groups);
+      when(mockUGI.getShortUserName()).thenReturn("user3");
+
+      iterator = root.getChildren().iterator();
+      Queue firstSubQueue1 = iterator.next();
+      Queue secondSubQueue1 = iterator.next();
+      // tets equal method
+      assertTrue(firstSubQueue.equals(firstSubQueue1));
+      assertEquals("running",firstSubQueue1.getState().getStateName());
+      assertEquals("stopped", secondSubQueue1.getState().getStateName());
+
+
+      // test JobQueueInfo
+      assertEquals("first",firstSubQueue.getJobQueueInfo().getQueueName());
+      assertEquals("running", firstSubQueue.getJobQueueInfo().getQueueState());
+      assertEquals(secondSubQueue.getJobQueueInfo().getChildren().size(), 0);
+    
+
+    } finally {
+      if (f != null) {
+        f.delete();
+      }
+    }
+  }
+
+  private Configuration getConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY,
+        "first,second");
+    conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX
+        + "first.acl-submit-job", "user1,user2 group1,group2");
+    conf.set(MRConfig.MR_ACLS_ENABLED, "true");
+    conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX + "first.state",
+        "running");
+    conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX + "second.state",
+        "stopped");
+    return conf;
+  }
+
+
+  /**
+   * test for Qmanager with empty configuration
+   * 
+   * @throws IOException
+   */
+
+  @Test  (timeout=1000)
+  public void test2Queue() throws IOException {
+    Configuration conf = getConfiguration();
+
+    QueueManager manager = new QueueManager(conf);
+
+    Queue root = manager.getRoot();
+    
+    // test children queues
+    assertTrue(root.getChildren().size() == 2);
+    Iterator<Queue> iterator = root.getChildren().iterator();
+    Queue firstSubQueue = iterator.next();
+    assertTrue(firstSubQueue.getName().equals("first"));
+    assertEquals(
+        firstSubQueue.getAcls().get("mapred.queue.first.acl-submit-job")
+            .toString(),
+        "Users [user1, user2] and members of the groups [group1, group2] are allowed");
+    Queue secondSubQueue = iterator.next();
+    assertTrue(secondSubQueue.getName().equals("second"));
+
+    assertEquals("running", firstSubQueue.getState().getStateName());
+    assertEquals( "stopped", secondSubQueue.getState().getStateName());
+
+// test leaf queue
+    Set<String> template = new HashSet<String>();
+    template.add("first");
+    template.add("second");
+    conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX + "first.state",
+        "stopped");
+
+    
+  }
+/**
+ * write cofiguration
+ * @return
+ * @throws IOException
+ */
+  private File writeFile() throws IOException {
+
+    File f = null;
+    f = File.createTempFile("tst", "xml");
+    BufferedWriter out = new BufferedWriter(new FileWriter(f));
+    String properties = "<properties><property key=\"key\" value=\"value\"/><property key=\"key1\" value=\"value1\"/></properties>";
+    out.write("<queues>");
+    out.newLine();
+    out.write("<queue><name>first</name><acl-submit-job>user1,user2 group1,group2</acl-submit-job><acl-administer-jobs>user3,user4 group3,group4</acl-administer-jobs><state>running</state></queue>");
+    out.newLine();
+    out.write("<queue><name>second</name><acl-submit-job>u1,u2 g1,g2</acl-submit-job>"
+        + properties + "<state>stopped</state></queue>");
+    out.newLine();
+    out.write("</queues>");
+    out.flush();
+    out.close();
+    return f;
+
+  }
+  
+}

+ 61 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestSkipBadRecords.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * test SkipBadRecords
+ * 
+ * 
+ */
+public class TestSkipBadRecords {
+  @Test (timeout=10000)
+  public void testSkipBadRecords() {
+    // test default values
+    Configuration conf = new Configuration();
+    assertEquals(2, SkipBadRecords.getAttemptsToStartSkipping(conf));
+    assertTrue(SkipBadRecords.getAutoIncrMapperProcCount(conf));
+    assertTrue(SkipBadRecords.getAutoIncrReducerProcCount(conf));
+    assertEquals(0, SkipBadRecords.getMapperMaxSkipRecords(conf));
+    assertEquals(0, SkipBadRecords.getReducerMaxSkipGroups(conf), 0);
+    assertNull(SkipBadRecords.getSkipOutputPath(conf));
+
+    // test setters
+    SkipBadRecords.setAttemptsToStartSkipping(conf, 5);
+    SkipBadRecords.setAutoIncrMapperProcCount(conf, false);
+    SkipBadRecords.setAutoIncrReducerProcCount(conf, false);
+    SkipBadRecords.setMapperMaxSkipRecords(conf, 6L);
+    SkipBadRecords.setReducerMaxSkipGroups(conf, 7L);
+    JobConf jc= new JobConf();
+    SkipBadRecords.setSkipOutputPath(jc, new Path("test"));
+    
+    // test getters 
+    assertEquals(5, SkipBadRecords.getAttemptsToStartSkipping(conf));
+    assertFalse(SkipBadRecords.getAutoIncrMapperProcCount(conf));
+    assertFalse(SkipBadRecords.getAutoIncrReducerProcCount(conf));
+    assertEquals(6L, SkipBadRecords.getMapperMaxSkipRecords(conf));
+    assertEquals(7L, SkipBadRecords.getReducerMaxSkipGroups(conf), 0);
+    assertEquals("test",SkipBadRecords.getSkipOutputPath(jc).toString());
+    
+  }
+
+}

+ 128 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java

@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * TestCounters checks the sanity and recoverability of Queue
+ */
+public class TestTaskLog {
+/**
+ * test TaskAttemptID 
+ * @throws IOException
+ */
+  @Test (timeout=20000)
+  public void testTaskLog() throws IOException {
+    // test TaskLog
+    System.setProperty(MRJobConfig.TASK_LOG_DIR, "testString");
+    assertEquals(TaskLog.getMRv2LogDir(), "testString");
+    TaskAttemptID taid= mock(TaskAttemptID.class);
+    JobID jid= new JobID("job",1);
+    
+    when(taid.getJobID()).thenReturn(jid);
+    when(taid.toString()).thenReturn("JobId");
+    
+   File f= TaskLog.getTaskLogFile(taid,true,LogName.STDOUT);
+   assertTrue(f.getAbsolutePath().endsWith("testString/stdout"));
+   
+   //test getRealTaskLogFileLocation
+   
+   File indexFile = TaskLog.getIndexFile(taid, true);
+   if(!indexFile.getParentFile().exists()){
+     indexFile.getParentFile().mkdirs();
+   }
+   indexFile.delete();
+   indexFile.createNewFile(); 
+   
+   
+   TaskLog.syncLogs("location", taid, true);
+
+   assertTrue(indexFile.getAbsolutePath().endsWith("userlogs/job_job_0001/JobId.cleanup/log.index"));
+
+   f= TaskLog.getRealTaskLogFileLocation(taid,true,LogName.DEBUGOUT);
+   if(f!=null){
+     assertTrue(f.getAbsolutePath().endsWith("location/debugout"));
+     FileUtils.copyFile(indexFile, f);
+   }
+   // test obtainLogDirOwner
+   assertTrue(TaskLog.obtainLogDirOwner(taid).length()>0);
+
+  }
+  
+  public  String readTaskLog(TaskLog.LogName filter,
+      org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
+      throws IOException {
+    // string buffer to store task log
+    StringBuffer result = new StringBuffer();
+    int res;
+
+    // reads the whole tasklog into inputstream
+    InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
+        isCleanup);
+    // construct string log from inputstream.
+    byte[] b = new byte[65536];
+    while (true) {
+      res = taskLogReader.read(b);
+      if (res > 0) {
+        result.append(new String(b));
+      } else {
+        break;
+      }
+    }
+    taskLogReader.close();
+
+    // trim the string and return it
+    String str = result.toString();
+    str = str.trim();
+    return str;
+  }
+
+  /**
+   * test without TASK_LOG_DIR
+   * @throws IOException
+   */
+  @Test  (timeout=20000)
+  public void testTaskLogWithoutTaskLogDir() throws IOException {
+    System.clearProperty(MRJobConfig.TASK_LOG_DIR);
+
+    // test TaskLog
+    
+    assertEquals(TaskLog.getMRv2LogDir(), null);
+    TaskAttemptID taid= mock(TaskAttemptID.class);
+    JobID jid= new JobID("job",1);
+    
+    when(taid.getJobID()).thenReturn(jid);
+    when(taid.toString()).thenReturn("JobId");
+    
+   File f= TaskLog.getTaskLogFile(taid,true,LogName.STDOUT);
+   assertTrue(f.getAbsolutePath().endsWith("stdout"));
+   
+  }
+  
+}

+ 71 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLogAppender.java

@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.mapred;
+
+import java.io.StringWriter;
+import java.io.Writer;
+
+import org.apache.log4j.Category;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.Priority;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskLogAppender {
+/**
+ * test TaskLogAppender 
+ */
+  @SuppressWarnings("deprecation")
+  @Test (timeout=1000)
+  public void testTaskLogAppender(){
+    TaskLogAppender appender= new TaskLogAppender();
+    
+    System.setProperty(TaskLogAppender.TASKID_PROPERTY,"attempt_01_02_m03_04_001");
+    System.setProperty(TaskLogAppender.LOGSIZE_PROPERTY, "1003");
+    appender.activateOptions();
+    assertEquals(appender.getTaskId(), "attempt_01_02_m03_04_001");
+    assertEquals(appender.getTotalLogFileSize(),1000);
+    assertEquals(appender.getIsCleanup(),false);
+    
+    // test writer   
+    Writer writer= new StringWriter();
+    appender.setWriter(writer);
+    Layout layout =  new PatternLayout("%-5p [%t]: %m%n");
+    appender.setLayout(layout);
+    Category logger= Logger.getLogger(getClass().getName());
+    LoggingEvent event = new LoggingEvent("fqnOfCategoryClass", logger, Priority.INFO, "message", new Throwable());
+    appender.append(event);
+    appender.flush() ;
+    appender.close();
+    assertTrue(writer.toString().length()>0);
+    
+    // test cleanup should not changed 
+    appender= new TaskLogAppender();
+    appender.setIsCleanup(true);
+    appender.activateOptions();
+    assertEquals(appender.getIsCleanup(),true);
+
+  
+  }
+  
+}

+ 56 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/mapred-queues.xml

@@ -0,0 +1,56 @@
+<?xml version="1.0"?>
+<!-- This is the template for queue configuration. The format supports nesting of
+     queues within queues - a feature called hierarchical queues. All queues are
+     defined within the 'queues' tag which is the top level element for this
+     XML document.
+     The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
+     on queue operations such as submitting jobs, killing jobs etc. -->
+<queues aclsEnabled="false">
+
+  <!-- Configuration for a queue is specified by defining a 'queue' element. -->
+  <queue>
+
+    <!-- Name of a queue. Queue name cannot contain a ':'  -->
+    <name>default</name>
+
+    <!-- properties for a queue, typically used by schedulers,
+    can be defined here -->
+    <properties>
+    </properties>
+
+	<!-- State of the queue. If running, the queue will accept new jobs.
+         If stopped, the queue will not accept new jobs. -->
+    <state>running</state>
+
+    <!-- Specifies the ACLs to check for submitting jobs to this queue.
+         If set to '*', it allows all users to submit jobs to the queue.
+         For specifying a list of users and groups the format to use is
+         user1,user2 group1,group2 -->
+    <acl-submit-job>*</acl-submit-job>
+
+    <!-- Specifies the ACLs to check for modifying jobs in this queue.
+         Modifications include killing jobs, tasks of jobs or changing
+         priorities.
+         If set to '*', it allows all users to submit jobs to the queue.
+         For specifying a list of users and groups the format to use is
+         user1,user2 group1,group2 -->
+    <acl-administer-jobs>*</acl-administer-jobs>
+  </queue>
+
+  <!-- Here is a sample of a hierarchical queue configuration
+       where q2 is a child of q1. In this example, q2 is a leaf level
+       queue as it has no queues configured within it. Currently, ACLs
+       and state are only supported for the leaf level queues.
+       Note also the usage of properties for the queue q2. -->
+  <queue>
+    <name>q1</name>
+    <queue>
+      <name>q2</name>
+      <properties>
+        <property key="capacity" value="20"/>
+        <property key="user-limit" value="30"/>
+      </properties>
+    </queue>
+  </queue>
+ 
+</queues>

+ 9 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java

@@ -26,14 +26,14 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 
 
 import org.junit.Test;
 import org.junit.Test;
-
+import static org.junit.Assert.*;
 public class TestIFile {
 public class TestIFile {
 
 
-  @Test
   /**
   /**
    * Create an IFile.Writer using GzipCodec since this codec does not
    * Create an IFile.Writer using GzipCodec since this codec does not
    * have a compressor when run via the tests (ie no native libraries).
    * have a compressor when run via the tests (ie no native libraries).
    */
    */
+  @Test (timeout=10000)
   public void testIFileWriterWithCodec() throws Exception {
   public void testIFileWriterWithCodec() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     FileSystem localFs = FileSystem.getLocal(conf);
     FileSystem localFs = FileSystem.getLocal(conf);
@@ -47,7 +47,7 @@ public class TestIFile {
     writer.close();
     writer.close();
   }
   }
 
 
-  @Test
+  @Test (timeout=10000)
   /** Same as above but create a reader. */
   /** Same as above but create a reader. */
   public void testIFileReaderWithCodec() throws Exception {
   public void testIFileReaderWithCodec() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
@@ -59,5 +59,11 @@ public class TestIFile {
     IFile.Reader<Text, Text> reader =
     IFile.Reader<Text, Text> reader =
       new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
       new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
     reader.close();
     reader.close();
+    
+    // test check sum 
+    byte[] ab= new byte[100];
+    int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
+    assertEquals( readed,reader.checksumIn.getChecksum().length);
+    
   }
   }
 }
 }

+ 66 - 29
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java

@@ -21,41 +21,78 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
+import org.junit.Test;
 
 
-public class TestMultiFileSplit extends TestCase{
-
-    public void testReadWrite() throws Exception {
-      MultiFileSplit split = new MultiFileSplit(new JobConf(), new Path[] {new Path("/test/path/1"), new Path("/test/path/2")}, new long[] {100,200});
-        
-      ByteArrayOutputStream bos = null;
-      byte[] result = null;
-      try {    
-        bos = new ByteArrayOutputStream();
-        split.write(new DataOutputStream(bos));
-        result = bos.toByteArray();
-      } finally {
-        IOUtils.closeStream(bos);
-      }
-      
-      MultiFileSplit readSplit = new MultiFileSplit();
-      ByteArrayInputStream bis = null;
-      try {
-        bis = new ByteArrayInputStream(result);
-        readSplit.readFields(new DataInputStream(bis));
-      } finally {
-        IOUtils.closeStream(bis);
-      }
-      
-      assertTrue(split.getLength() != 0);
-      assertEquals(split.getLength(), readSplit.getLength());
-      assertTrue(Arrays.equals(split.getPaths(), readSplit.getPaths()));
-      assertTrue(Arrays.equals(split.getLengths(), readSplit.getLengths()));
-      System.out.println(split.toString());
+import static org.junit.Assert.*;
+
+/**
+ * 
+ * test MultiFileSplit class
+ */
+public class TestMultiFileSplit {
+  
+  @Test (timeout=10000)
+  public void testReadWrite() throws Exception {
+    MultiFileSplit split = new MultiFileSplit(new JobConf(), new Path[] {
+        new Path("/test/path/1"), new Path("/test/path/2") }, new long[] { 100,
+        200 });
+
+    ByteArrayOutputStream bos = null;
+    byte[] result = null;
+    try {
+      bos = new ByteArrayOutputStream();
+      split.write(new DataOutputStream(bos));
+      result = bos.toByteArray();
+    } finally {
+      IOUtils.closeStream(bos);
+    }
+
+    MultiFileSplit readSplit = new MultiFileSplit();
+    ByteArrayInputStream bis = null;
+    try {
+      bis = new ByteArrayInputStream(result);
+      readSplit.readFields(new DataInputStream(bis));
+    } finally {
+      IOUtils.closeStream(bis);
     }
     }
+
+    assertTrue(split.getLength() != 0);
+    assertEquals(split.getLength(), readSplit.getLength());
+    assertTrue(Arrays.equals(split.getPaths(), readSplit.getPaths()));
+    assertTrue(Arrays.equals(split.getLengths(), readSplit.getLengths()));
+    System.out.println(split.toString());
+  }
+
+  /**
+   * test method getLocations
+   * 
+   * @throws IOException
+   */
+  public void testgetLocations() throws IOException {
+    JobConf job = new JobConf();
+
+    File tmpFile = File.createTempFile("test", "txt");
+    tmpFile.createNewFile();
+    OutputStream out = new FileOutputStream(tmpFile);
+    out.write("tempfile".getBytes());
+    out.flush();
+    out.close();
+    Path[] path = { new Path(tmpFile.getAbsolutePath()) };
+    long[] lengths = { 100 };
+
+    MultiFileSplit split = new MultiFileSplit(job, path, lengths);
+    String[] locations = split.getLocations();
+    assertTrue(locations.length == 1);
+    assertEquals(locations[0], "localhost");
+  }
 }
 }

+ 350 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestNetworkedJob.java

@@ -19,18 +19,45 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import static org.junit.Assert.*;
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
-import java.util.List;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
 
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
+import org.apache.hadoop.mapred.JobClient.NetworkedJob;
+import org.apache.hadoop.mapred.JobClient.TaskStatusFilter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.yarn.YarnException;
 import org.junit.Test;
 import org.junit.Test;
-import static org.mockito.Mockito.*;
-
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 
 public class TestNetworkedJob {
 public class TestNetworkedJob {
-
-  @SuppressWarnings("deprecation")
-  @Test
+  private static String TEST_ROOT_DIR = new File(System.getProperty(
+      "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
+  private static Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
+  private static Path inFile = new Path(testDir, "in");
+  private static Path outDir = new Path(testDir, "out");
+  
+  @Test (timeout=10000)
   public void testGetNullCounters() throws Exception {
   public void testGetNullCounters() throws Exception {
     //mock creation
     //mock creation
     Job mockJob = mock(Job.class);
     Job mockJob = mock(Job.class);
@@ -41,4 +68,321 @@ public class TestNetworkedJob {
     //verification
     //verification
     verify(mockJob).getCounters();
     verify(mockJob).getCounters();
   }
   }
+  @Test (timeout=100000)
+  public void testGetJobStatus() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    MiniMRClientCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
+          new Configuration());
+
+      JobConf job = new JobConf(mr.getConfig());
+
+      fileSys = FileSystem.get(job);
+      fileSys.delete(testDir, true);
+      FSDataOutputStream out = fileSys.create(inFile, true);
+      out.writeBytes("This is a test file");
+      out.close();
+
+      FileInputFormat.setInputPaths(job, inFile);
+      FileOutputFormat.setOutputPath(job, outDir);
+
+      job.setInputFormat(TextInputFormat.class);
+      job.setOutputFormat(TextOutputFormat.class);
+
+      job.setMapperClass(IdentityMapper.class);
+      job.setReducerClass(IdentityReducer.class);
+      job.setNumReduceTasks(0);
+
+
+    } finally {
+      if (fileSys != null) {
+        fileSys.delete(testDir, true);
+      }
+      if (mr != null) {
+        mr.stop();
+      }
+    }
+  }
+/**
+ * test JobConf 
+ * @throws Exception
+ */
+  @SuppressWarnings({ "unused", "deprecation" })
+  @Test (timeout=100000)
+  public void testNetworkedJob() throws Exception {
+    // mock creation
+    MiniMRClientCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      Configuration conf = new Configuration();
+      mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
+
+      JobConf job = new JobConf(mr.getConfig());
+
+      fileSys = FileSystem.get(job);
+      fileSys.delete(testDir, true);
+      FSDataOutputStream out = fileSys.create(inFile, true);
+      out.writeBytes("This is a test file");
+      out.close();
+
+      FileInputFormat.setInputPaths(job, inFile);
+      FileOutputFormat.setOutputPath(job, outDir);
+
+      job.setInputFormat(TextInputFormat.class);
+      job.setOutputFormat(TextOutputFormat.class);
+
+      job.setMapperClass(IdentityMapper.class);
+      job.setReducerClass(IdentityReducer.class);
+      job.setNumReduceTasks(0);
+
+      JobClient client = new JobClient(mr.getConfig());
+
+      RunningJob rj = client.submitJob(job);
+      JobID jobId = rj.getID();
+      NetworkedJob runningJob = (NetworkedJob) client.getJob(jobId);
+      runningJob.setJobPriority(JobPriority.HIGH.name());
+      // test getters
+      assertTrue(runningJob.getConfiguration().toString()
+          .endsWith("0001/job.xml"));
+      assertEquals(runningJob.getID(), jobId);
+      assertEquals(runningJob.getJobID(), jobId.toString());
+      assertTrue(runningJob.getJobName().contains( "hadoop-"));
+      assertTrue(runningJob.getJobFile().endsWith(
+          ".staging/" + runningJob.getJobID() + "/job.xml"));
+      assertTrue(runningJob.getTrackingURL().length() > 0);
+      assertTrue(runningJob.mapProgress() == 0.0f);
+      assertTrue(runningJob.reduceProgress() == 0.0f);
+      assertTrue(runningJob.cleanupProgress() == 0.0f);
+      assertTrue(runningJob.setupProgress() == 0.0f);
+
+      TaskCompletionEvent[] tce = runningJob.getTaskCompletionEvents(0);
+      assertEquals(tce.length, 0);
+
+      assertEquals(runningJob.getHistoryUrl(),"");
+      assertFalse(runningJob.isRetired());
+      assertEquals( runningJob.getFailureInfo(),"");
+      assertEquals(client.getMapTaskReports(jobId).length, 0);
+      
+      try {
+        client.getSetupTaskReports(jobId);
+      } catch (YarnException e) {
+        assertEquals(e.getMessage(), "Unrecognized task type: JOB_SETUP");
+      }
+      try {
+        client.getCleanupTaskReports(jobId);
+      } catch (YarnException e) {
+        assertEquals(e.getMessage(), "Unrecognized task type: JOB_CLEANUP");
+      }
+      assertEquals(client.getReduceTaskReports(jobId).length, 0);
+      // test ClusterStatus
+      ClusterStatus status = client.getClusterStatus(true);
+      assertEquals(status.getActiveTrackerNames().size(), 2);
+      // it method does not implemented and always return empty array or null;
+      assertEquals(status.getBlacklistedTrackers(), 0);
+      assertEquals(status.getBlacklistedTrackerNames().size(), 0);
+      assertEquals(status.getBlackListedTrackersInfo().size(), 0);
+      assertEquals(status.getJobTrackerStatus(), JobTrackerStatus.RUNNING);
+      assertEquals(status.getMapTasks(), 1);
+      assertEquals(status.getMaxMapTasks(), 20);
+      assertEquals(status.getMaxReduceTasks(), 4);
+      assertEquals(status.getNumExcludedNodes(), 0);
+      assertEquals(status.getReduceTasks(), 1);
+      assertEquals(status.getTaskTrackers(), 2);
+      assertEquals(status.getTTExpiryInterval(), 0);
+      assertEquals(status.getJobTrackerStatus(), JobTrackerStatus.RUNNING);
+
+      // test read and write
+      ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
+      status.write(new DataOutputStream(dataOut));
+      ClusterStatus status2 = new ClusterStatus();
+
+      status2.readFields(new DataInputStream(new ByteArrayInputStream(dataOut
+          .toByteArray())));
+      assertEquals(status.getActiveTrackerNames(),
+          status2.getActiveTrackerNames());
+      assertEquals(status.getBlackListedTrackersInfo(),
+          status2.getBlackListedTrackersInfo());
+      assertEquals(status.getMapTasks(), status2.getMapTasks());
+
+      try {
+      } catch (RuntimeException e) {
+        assertTrue(e.getMessage().endsWith("not found on CLASSPATH"));
+      }
+
+      // test taskStatusfilter
+      JobClient.setTaskOutputFilter(job, TaskStatusFilter.ALL);
+      assertEquals(JobClient.getTaskOutputFilter(job), TaskStatusFilter.ALL);
+
+      // runningJob.setJobPriority(JobPriority.HIGH.name());
+
+      // test default map
+      assertEquals(client.getDefaultMaps(), 20);
+      assertEquals(client.getDefaultReduces(), 4);
+      assertEquals(client.getSystemDir().getName(), "jobSubmitDir");
+      // test queue information
+      JobQueueInfo[] rootQueueInfo = client.getRootQueues();
+      assertEquals(rootQueueInfo.length, 1);
+      assertEquals(rootQueueInfo[0].getQueueName(), "default");
+      JobQueueInfo[] qinfo = client.getQueues();
+      assertEquals(qinfo.length, 1);
+      assertEquals(qinfo[0].getQueueName(), "default");
+      assertEquals(client.getChildQueues("default").length, 0);
+      assertEquals(client.getJobsFromQueue("default").length, 1);
+      assertTrue(client.getJobsFromQueue("default")[0].getJobFile().endsWith(
+          "/job.xml"));
+
+      JobQueueInfo qi = client.getQueueInfo("default");
+      assertEquals(qi.getQueueName(), "default");
+      assertEquals(qi.getQueueState(), "running");
+
+      QueueAclsInfo[] aai = client.getQueueAclsForCurrentUser();
+      assertEquals(aai.length, 2);
+      assertEquals(aai[0].getQueueName(), "root");
+      assertEquals(aai[1].getQueueName(), "default");
+      // test token
+      Token<DelegationTokenIdentifier> token = client
+          .getDelegationToken(new Text(UserGroupInformation.getCurrentUser()
+              .getShortUserName()));
+      assertEquals(token.getKind().toString(), "RM_DELEGATION_TOKEN");
+      
+      // test JobClient
+      
+      try {
+        long l = client.renewDelegationToken(token);
+      } catch (UnsupportedOperationException e) {
+
+        assertTrue(e.getMessage().endsWith(
+            "is not supported  for RM_DELEGATION_TOKEN tokens"));
+      }
+      try {
+        client.cancelDelegationToken(token);
+      } catch (UnsupportedOperationException e) {
+        assertTrue(e.getMessage().endsWith(
+            "is not supported  for RM_DELEGATION_TOKEN tokens"));
+      }
+    } finally {
+      if (fileSys != null) {
+        fileSys.delete(testDir, true);
+      }
+      if (mr != null) {
+        mr.stop();
+      }
+    }
+  }
+
+  /**
+   * test BlackListInfo class
+   * 
+   * @throws IOException
+   */
+  @Test (timeout=10000)
+  public void testBlackListInfo() throws IOException {
+    BlackListInfo info = new BlackListInfo();
+    info.setBlackListReport("blackListInfo");
+    info.setReasonForBlackListing("reasonForBlackListing");
+    info.setTrackerName("trackerName");
+    ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(byteOut);
+    info.write(out);
+    BlackListInfo info2 = new BlackListInfo();
+    info2.readFields(new DataInputStream(new ByteArrayInputStream(byteOut
+        .toByteArray())));
+    assertEquals(info, info);
+    assertEquals(info.toString(), info.toString());
+    assertEquals(info.getTrackerName(), "trackerName");
+    assertEquals(info.getReasonForBlackListing(), "reasonForBlackListing");
+    assertEquals(info.getBlackListReport(), "blackListInfo");
+
+  }
+/**
+ *  test run from command line JobQueueClient
+ * @throws Exception
+ */
+  @Test (timeout=10000)
+  public void testJobQueueClient() throws Exception {
+        MiniMRClientCluster mr = null;
+    FileSystem fileSys = null;
+    PrintStream oldOut = System.out;
+    try {
+      Configuration conf = new Configuration();
+      mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
+
+      JobConf job = new JobConf(mr.getConfig());
+
+      fileSys = FileSystem.get(job);
+      fileSys.delete(testDir, true);
+      FSDataOutputStream out = fileSys.create(inFile, true);
+      out.writeBytes("This is a test file");
+      out.close();
+
+      FileInputFormat.setInputPaths(job, inFile);
+      FileOutputFormat.setOutputPath(job, outDir);
+
+      job.setInputFormat(TextInputFormat.class);
+      job.setOutputFormat(TextOutputFormat.class);
+
+      job.setMapperClass(IdentityMapper.class);
+      job.setReducerClass(IdentityReducer.class);
+      job.setNumReduceTasks(0);
+
+      JobClient client = new JobClient(mr.getConfig());
+
+      client.submitJob(job);
+
+      JobQueueClient jobClient = new JobQueueClient(job);
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(bytes));
+      String[] arg = { "-list" };
+      jobClient.run(arg);
+      assertTrue(bytes.toString().contains("Queue Name : default"));
+      assertTrue(bytes.toString().contains("Queue State : running"));
+      bytes = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(bytes));
+      String[] arg1 = { "-showacls" };
+      jobClient.run(arg1);
+      assertTrue(bytes.toString().contains("Queue acls for user :"));
+      assertTrue(bytes.toString().contains(
+          "root  ADMINISTER_QUEUE,SUBMIT_APPLICATIONS"));
+      assertTrue(bytes.toString().contains(
+          "default  ADMINISTER_QUEUE,SUBMIT_APPLICATIONS"));
+
+      // test for info and default queue
+
+      bytes = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(bytes));
+      String[] arg2 = { "-info", "default" };
+      jobClient.run(arg2);
+      assertTrue(bytes.toString().contains("Queue Name : default"));
+      assertTrue(bytes.toString().contains("Queue State : running"));
+      assertTrue(bytes.toString().contains("Scheduling Info"));
+
+      // test for info , default queue and jobs
+      bytes = new ByteArrayOutputStream();
+      System.setOut(new PrintStream(bytes));
+      String[] arg3 = { "-info", "default", "-showJobs" };
+      jobClient.run(arg3);
+      assertTrue(bytes.toString().contains("Queue Name : default"));
+      assertTrue(bytes.toString().contains("Queue State : running"));
+      assertTrue(bytes.toString().contains("Scheduling Info"));
+      assertTrue(bytes.toString().contains("job_1"));
+
+      String[] arg4 = {};
+      jobClient.run(arg4);
+
+      
+    } finally {
+      System.setOut(oldOut);
+      if (fileSys != null) {
+        fileSys.delete(testDir, true);
+      }
+      if (mr != null) {
+        mr.stop();
+      }
+    }
+  }
 }
 }

+ 72 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestQueueConfigurationParser.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.StringWriter;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestQueueConfigurationParser {
+  /**
+   * test xml generation
+   * 
+   * @throws ParserConfigurationException
+   * @throws Exception 
+   */
+  @Test (timeout=10000)
+  public void testQueueConfigurationParser()
+      throws  Exception {
+    JobQueueInfo info = new JobQueueInfo("root", "rootInfo");
+    JobQueueInfo infoChild1 = new JobQueueInfo("child1", "child1Info");
+    JobQueueInfo infoChild2 = new JobQueueInfo("child2", "child1Info");
+
+    info.addChild(infoChild1);
+    info.addChild(infoChild2);
+    DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
+        .newInstance();
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document document = builder.newDocument();
+
+    // test QueueConfigurationParser.getQueueElement.
+    Element e = QueueConfigurationParser.getQueueElement(document, info);
+    // transform result to string for check
+    DOMSource domSource = new DOMSource(e);
+    StringWriter writer = new StringWriter();
+    StreamResult result = new StreamResult(writer);
+    TransformerFactory tf = TransformerFactory.newInstance();
+    Transformer transformer = tf.newTransformer();
+    transformer.transform(domSource, result);
+    String str = writer.toString();
+
+    assertTrue(str
+        .endsWith("<queue><name>root</name><properties/><state>running</state><queue><name>child1</name><properties/><state>running</state></queue><queue><name>child2</name><properties/><state>running</state></queue></queue>"));
+  }
+}

+ 29 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestStatisticsCollector.java

@@ -17,13 +17,18 @@
  */
  */
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
-import junit.framework.TestCase;
+import java.util.Map;
+
 
 
 import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
 import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
 import org.apache.hadoop.mapred.StatisticsCollector.Stat;
 import org.apache.hadoop.mapred.StatisticsCollector.Stat;
+import org.junit.Test;
 
 
-public class TestStatisticsCollector extends TestCase{
+import static org.junit.Assert.*;
+public class TestStatisticsCollector {
 
 
+  @SuppressWarnings("rawtypes")
+  @Test (timeout =20000)
   public void testMovingWindow() throws Exception {
   public void testMovingWindow() throws Exception {
     StatisticsCollector collector = new StatisticsCollector(1);
     StatisticsCollector collector = new StatisticsCollector(1);
     TimeWindow window = new TimeWindow("test", 6, 2);
     TimeWindow window = new TimeWindow("test", 6, 2);
@@ -78,6 +83,28 @@ public class TestStatisticsCollector extends TestCase{
     collector.update();
     collector.update();
     assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
     assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
     assertEquals(95, stat.getValues().get(sincStart).getValue());
     assertEquals(95, stat.getValues().get(sincStart).getValue());
+    
+    //  test Stat class 
+    Map updaters= collector.getUpdaters();
+    assertEquals(updaters.size(),2);
+    Map<String, Stat> ststistics=collector.getStatistics();
+    assertNotNull(ststistics.get("m1"));
+    
+   Stat newStat= collector.createStat("m2"); 
+    assertEquals(newStat.name, "m2");
+    Stat st=collector.removeStat("m1");
+    assertEquals(st.name, "m1");
+    assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
+    assertEquals(95, stat.getValues().get(sincStart).getValue());
+     st=collector.removeStat("m1");
+     // try to remove stat again
+    assertNull(st);
+    collector.start();
+    // waiting 2,5 sec
+    Thread.sleep(2500);
+    assertEquals(69, stat.getValues().get(window).getValue());
+    assertEquals(95, stat.getValues().get(sincStart).getValue());
+  
   }
   }
 
 
 }
 }

+ 19 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -61,11 +61,12 @@ public class TestTextInputFormat {
       throw new RuntimeException("init failure", e);
       throw new RuntimeException("init failure", e);
     }
     }
   }
   }
+  @SuppressWarnings("deprecation")
   private static Path workDir =
   private static Path workDir =
     new Path(new Path(System.getProperty("test.build.data", "/tmp")),
     new Path(new Path(System.getProperty("test.build.data", "/tmp")),
              "TestTextInputFormat").makeQualified(localFs);
              "TestTextInputFormat").makeQualified(localFs);
 
 
-  @Test
+  @Test (timeout=100000)
   public void testFormat() throws Exception {
   public void testFormat() throws Exception {
     JobConf job = new JobConf(defaultConf);
     JobConf job = new JobConf(defaultConf);
     Path file = new Path(workDir, "test.txt");
     Path file = new Path(workDir, "test.txt");
@@ -145,7 +146,7 @@ public class TestTextInputFormat {
     }
     }
   }
   }
 
 
-  @Test
+  @Test (timeout=500000)
   public void testSplitableCodecs() throws IOException {
   public void testSplitableCodecs() throws IOException {
     JobConf conf = new JobConf(defaultConf);
     JobConf conf = new JobConf(defaultConf);
     int seed = new Random().nextInt();
     int seed = new Random().nextInt();
@@ -250,7 +251,7 @@ public class TestTextInputFormat {
                                            bufsz);
                                            bufsz);
   }
   }
 
 
-  @Test
+  @Test (timeout=10000)
   public void testUTF8() throws Exception {
   public void testUTF8() throws Exception {
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
     Text line = new Text();
@@ -269,7 +270,7 @@ public class TestTextInputFormat {
    *
    *
    * @throws Exception
    * @throws Exception
    */
    */
-  @Test
+  @Test (timeout=10000)
   public void testNewLines() throws Exception {
   public void testNewLines() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
     final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
     final int STRLENBYTES = STR.getBytes().length;
@@ -309,7 +310,7 @@ public class TestTextInputFormat {
    *
    *
    * @throws Exception
    * @throws Exception
    */
    */
-  @Test
+  @Test (timeout=10000)
   public void testMaxLineLength() throws Exception {
   public void testMaxLineLength() throws Exception {
     final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
     final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
     final int STRLENBYTES = STR.getBytes().length;
     final int STRLENBYTES = STR.getBytes().length;
@@ -334,7 +335,7 @@ public class TestTextInputFormat {
     }
     }
   }
   }
 
 
-  @Test
+  @Test (timeout=10000)
   public void testMRMaxLine() throws Exception {
   public void testMRMaxLine() throws Exception {
     final int MAXPOS = 1024 * 1024;
     final int MAXPOS = 1024 * 1024;
     final int MAXLINE = 10 * 1024;
     final int MAXLINE = 10 * 1024;
@@ -354,6 +355,9 @@ public class TestTextInputFormat {
         position += b.length;
         position += b.length;
         return b.length;
         return b.length;
       }
       }
+      public void reset() {
+        position=0;
+      }
     };
     };
     final LongWritable key = new LongWritable();
     final LongWritable key = new LongWritable();
     final Text val = new Text();
     final Text val = new Text();
@@ -362,8 +366,14 @@ public class TestTextInputFormat {
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
     conf.setInt(org.apache.hadoop.mapreduce.lib.input.
                 LineRecordReader.MAX_LINE_LENGTH, MAXLINE);
                 LineRecordReader.MAX_LINE_LENGTH, MAXLINE);
     conf.setInt("io.file.buffer.size", BUF); // used by LRR
     conf.setInt("io.file.buffer.size", BUF); // used by LRR
-    final LineRecordReader lrr = new LineRecordReader(infNull, 0, MAXPOS, conf);
+    // test another constructor 
+     LineRecordReader lrr = new LineRecordReader(infNull, 0, MAXPOS, conf);
+    assertFalse("Read a line from null", lrr.next(key, val));
+    infNull.reset();
+     lrr = new LineRecordReader(infNull, 0L, MAXLINE, MAXPOS);
     assertFalse("Read a line from null", lrr.next(key, val));
     assertFalse("Read a line from null", lrr.next(key, val));
+    
+    
   }
   }
 
 
   private static void writeFile(FileSystem fs, Path name, 
   private static void writeFile(FileSystem fs, Path name, 
@@ -400,7 +410,7 @@ public class TestTextInputFormat {
   /**
   /**
    * Test using the gzip codec for reading
    * Test using the gzip codec for reading
    */
    */
-  @Test
+  @Test  (timeout=10000)
   public void testGzip() throws IOException {
   public void testGzip() throws IOException {
     JobConf job = new JobConf(defaultConf);
     JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     CompressionCodec gzip = new GzipCodec();
@@ -434,7 +444,7 @@ public class TestTextInputFormat {
   /**
   /**
    * Test using the gzip codec and an empty input file
    * Test using the gzip codec and an empty input file
    */
    */
-  @Test
+  @Test (timeout=10000)
   public void testGzipEmpty() throws IOException {
   public void testGzipEmpty() throws IOException {
     JobConf job = new JobConf(defaultConf);
     JobConf job = new JobConf(defaultConf);
     CompressionCodec gzip = new GzipCodec();
     CompressionCodec gzip = new GzipCodec();

+ 67 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextOutputFormat.java

@@ -19,12 +19,14 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.*;
 import java.io.*;
-import junit.framework.TestCase;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
+import org.junit.Test;
 
 
-public class TestTextOutputFormat extends TestCase {
+import static org.junit.Assert.*;
+
+public class TestTextOutputFormat  {
   private static JobConf defaultConf = new JobConf();
   private static JobConf defaultConf = new JobConf();
 
 
   private static FileSystem localFs = null;
   private static FileSystem localFs = null;
@@ -43,8 +45,7 @@ public class TestTextOutputFormat extends TestCase {
                       new Path(System.getProperty("test.build.data", "."), 
                       new Path(System.getProperty("test.build.data", "."), 
                                "data"), 
                                "data"), 
                       FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
                       FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
-
-  @SuppressWarnings("unchecked")
+ @Test  (timeout=10000)
   public void testFormat() throws Exception {
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
     JobConf job = new JobConf();
     job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     job.set(JobContext.TASK_ATTEMPT_ID, attempt);
@@ -59,8 +60,8 @@ public class TestTextOutputFormat extends TestCase {
     // A reporter that does nothing
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
     Reporter reporter = Reporter.NULL;
 
 
-    TextOutputFormat theOutputFormat = new TextOutputFormat();
-    RecordWriter theRecordWriter =
+    TextOutputFormat<Object,Object> theOutputFormat = new TextOutputFormat<Object,Object>();
+    RecordWriter<Object,Object> theRecordWriter =
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
 
 
     Text key1 = new Text("key1");
     Text key1 = new Text("key1");
@@ -95,7 +96,7 @@ public class TestTextOutputFormat extends TestCase {
 
 
   }
   }
 
 
-  @SuppressWarnings("unchecked")
+ @Test  (timeout=10000)
   public void testFormatWithCustomSeparator() throws Exception {
   public void testFormatWithCustomSeparator() throws Exception {
     JobConf job = new JobConf();
     JobConf job = new JobConf();
     String separator = "\u0001";
     String separator = "\u0001";
@@ -112,8 +113,8 @@ public class TestTextOutputFormat extends TestCase {
     // A reporter that does nothing
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
     Reporter reporter = Reporter.NULL;
 
 
-    TextOutputFormat theOutputFormat = new TextOutputFormat();
-    RecordWriter theRecordWriter =
+    TextOutputFormat<Object,Object> theOutputFormat = new TextOutputFormat<Object,Object>();
+    RecordWriter<Object,Object> theRecordWriter =
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
 
 
     Text key1 = new Text("key1");
     Text key1 = new Text("key1");
@@ -147,7 +148,63 @@ public class TestTextOutputFormat extends TestCase {
     assertEquals(output, expectedOutput.toString());
     assertEquals(output, expectedOutput.toString());
 
 
   }
   }
-
+  /**
+   * test compressed file
+   * @throws IOException
+   */
+ 
+ @Test  (timeout=10000)
+ public void testCompress() throws IOException{
+   JobConf job = new JobConf();
+   String separator = "\u0001";
+   job.set("mapreduce.output.textoutputformat.separator", separator);
+   job.set(JobContext.TASK_ATTEMPT_ID, attempt);
+   job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS,"true");
+   
+   FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
+   FileOutputFormat.setWorkOutputPath(job, workDir);
+   FileSystem fs = workDir.getFileSystem(job);
+   if (!fs.mkdirs(workDir)) {
+     fail("Failed to create output directory");
+   }
+   String file = "test.txt";
+
+   // A reporter that does nothing
+   Reporter reporter = Reporter.NULL;
+
+   TextOutputFormat<Object,Object> theOutputFormat = new TextOutputFormat<Object,Object>();
+   RecordWriter<Object,Object> theRecordWriter =
+     theOutputFormat.getRecordWriter(localFs, job, file, reporter);
+   Text key1 = new Text("key1");
+   Text key2 = new Text("key2");
+   Text val1 = new Text("val1");
+   Text val2 = new Text("val2");
+   NullWritable nullWritable = NullWritable.get();
+
+   try {
+     theRecordWriter.write(key1, val1);
+     theRecordWriter.write(null, nullWritable);
+     theRecordWriter.write(null, val1);
+     theRecordWriter.write(nullWritable, val2);
+     theRecordWriter.write(key2, nullWritable);
+     theRecordWriter.write(key1, null);
+     theRecordWriter.write(null, null);
+     theRecordWriter.write(key2, val2);
+
+   } finally {
+     theRecordWriter.close(reporter);
+   }
+   File expectedFile = new File(new Path(workDir, file).toString());
+   StringBuffer expectedOutput = new StringBuffer();
+   expectedOutput.append(key1).append(separator).append(val1).append("\n");
+   expectedOutput.append(val1).append("\n");
+   expectedOutput.append(val2).append("\n");
+   expectedOutput.append(key2).append("\n");
+   expectedOutput.append(key1).append("\n");
+   expectedOutput.append(key2).append(separator).append(val2).append("\n");
+   String output = UtilsForTests.slurp(expectedFile);
+   assertEquals(output, expectedOutput.toString());
+ }
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
     new TestTextOutputFormat().testFormat();
     new TestTextOutputFormat().testFormat();
   }
   }