소스 검색

Fix up the merges from the 205 branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1137300 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 14 년 전
부모
커밋
a42b4a64c9

+ 1 - 0
ivy/libraries.properties

@@ -47,6 +47,7 @@ hsqldb.version=1.8.0.10
 ivy.version=2.1.0
 
 jasper.version=5.5.12
+jackson.version=1.0.1
 #not able to figureout the version of jsp & jsp-api version to get it resolved throught ivy
 # but still declared here as we are going to have a local copy from the lib folder
 jsp.version=2.1

+ 1 - 1
src/contrib/build-contrib.xml

@@ -283,7 +283,7 @@
     <mkdir dir="${hadoop.log.dir}"/>
     <junit
       printsummary="yes" showoutput="${test.output}" 
-      haltonfailure="no" fork="yes" maxmemory="256m"
+      haltonfailure="no" fork="yes" maxmemory="512m"
       errorProperty="tests.failed" failureProperty="tests.failed"
       timeout="${test.timeout}">
       

+ 2 - 1
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -432,7 +432,8 @@ public class TestCapacityScheduler extends TestCase {
       queues.clear();
       for (String qName : newQueues) {
         try {
-          queues.put(qName, new Queue(qName, acls, Queue.QueueState.RUNNING));
+          queues.put(qName, new Queue(qName, acls, Queue.QueueState.RUNNING,
+                     QueueMetrics.create(qName, new Configuration())));
         } catch (Throwable t) {
           throw new RuntimeException("Unable to initialize queue " + qName, t);
         }

+ 24 - 0
src/contrib/data_join/ivy.xml

@@ -40,5 +40,29 @@
       name="commons-math"
       rev="${commons-math.version}"
       conf="common->default"/>
+    <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->default"/>
+    <dependency org="org.codehaus.jackson"
+      name="jackson-core-asl"
+      rev="${jackson.version}"
+      conf="common->default"/>
+    <dependency org="org.codehaus.jackson"
+      name="jackson-mapper-asl"
+      rev="${jackson.version}"
+      conf="common->default"/>
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="common->master"/> 
     </dependencies>
 </ivy-module>

+ 26 - 2
src/contrib/fairscheduler/ivy.xml

@@ -47,8 +47,32 @@
       rev="${jsp-api-2.1.version}"
       conf="common->master"/>
     <dependency org="org.mortbay.jetty"
-      name="jsp-2.1"
-      rev="${jsp-2.1.version}"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->default"/>
+    <dependency org="org.codehaus.jackson"
+      name="jackson-core-asl"
+      rev="${jackson.version}"
+      conf="common->default"/>
+    <dependency org="org.codehaus.jackson"
+      name="jackson-mapper-asl"
+      rev="${jackson.version}"
+      conf="common->default"/>
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="common->master"/> 
+    <dependency org="commons-configuration"
+      name="commons-configuration"
+      rev="${commons-configuration.version}"
+      conf="common->master"/>
+    <dependency org="org.apache.commons"
+      name="commons-math"
+      rev="${commons-math.version}"
+      conf="common->master"/>
+    <dependency org="commons-lang"
+      name="commons-lang"
+      rev="${commons-lang.version}"
       conf="common->master"/>
   </dependencies>
 </ivy-module>

+ 4 - 2
src/contrib/hdfsproxy/build.xml

@@ -80,7 +80,8 @@
     </and>
   </condition>
 
-  <property name="ivy.settings.file" location="${hadoop.root}/ivy/ivysettings.xml"/>
+  <property name="ivy.settings.file" 
+            location="${hadoop.root}/ivy/ivysettings.xml"/>
 
   <target name="ivy-init" depends="ivy-init-antlib">
     <ivy:settings id="${ant.project.name}.ivy.settings"/>
@@ -239,7 +240,8 @@
     <copy file="${proxy.conf.dir}/tomcat-web.xml" todir="${src.test.resources}"/>
     <copy file="${proxy.conf.dir}/tomcat-forward-web.xml" todir="${src.test.resources}"/>
 
-    <junit fork="yes" printsummary="yes" errorProperty="tests.failed" failureProperty="tests.failed">
+    <junit fork="yes" printsummary="yes" errorProperty="tests.failed" 
+           failureProperty="tests.failed" showoutput="${test.output}">
       <classpath refid="test.classpath"/>
       <sysproperty key="test.build.data" value="${build.test}/data"/>
       <sysproperty key="build.test" value="${build.test}"/>

+ 16 - 10
src/contrib/hdfsproxy/src/test/org/apache/hadoop/hdfsproxy/TestHdfsProxy.java

@@ -26,6 +26,7 @@ import java.util.Random;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.log4j.Level;
@@ -46,13 +47,8 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
  * A JUnit test for HdfsProxy
  */
 public class TestHdfsProxy extends TestCase {
-  {
-    ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.hdfs.StateChange"))
-        .getLogger().setLevel(Level.OFF);
-    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.OFF);
-    ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
-  }
 
+  static final Log LOG = LogFactory.getLog(TestHdfsProxy.class);
   static final URI LOCAL_FS = URI.create("file:///");
 
   private static final int NFILES = 10;
@@ -221,12 +217,13 @@ public class TestHdfsProxy extends TestCase {
       final FileSystem localfs = FileSystem.get(LOCAL_FS, dfsConf);
       final FileSystem hdfs = cluster.getFileSystem();
       final Configuration proxyConf = new Configuration(false);
-      proxyConf.set("hdfsproxy.dfs.namenode.address", hdfs.getUri().getHost() + ":"
-          + hdfs.getUri().getPort());
+      proxyConf.set("hdfsproxy.dfs.namenode.address", hdfs.getUri().getHost() +
+          ":" + hdfs.getUri().getPort());
       proxyConf.set("hdfsproxy.https.address", "localhost:0");
       final String namenode = hdfs.getUri().toString();
       if (namenode.startsWith("hdfs://")) {
         MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR + "/srcdat");
+
         hdfs.copyFromLocalFile
 	    (new Path("file:///" + TEST_ROOT_DIR + "/srcdat"),
              new Path(namenode + "/destdat" ));
@@ -240,7 +237,8 @@ public class TestHdfsProxy extends TestCase {
         final String realProxyAddr = proxyAddr.getHostName() + ":"
             + proxy.getPort();
         final Path proxyUrl = new Path("hftp://" + realProxyAddr);
-	final FileSystem hftp = proxyUrl.getFileSystem(dfsConf);
+        final FileSystem hftp = proxyUrl.getFileSystem(dfsConf);
+
         FileUtil.copy(hftp, new Path(proxyUrl, "/destdat"),
                       hdfs, new Path(namenode + "/copied1"),
                       false, true, proxyConf);
@@ -260,13 +258,21 @@ public class TestHdfsProxy extends TestCase {
         deldir(localfs, TEST_ROOT_DIR + "/srcdat");
         deldir(localfs, TEST_ROOT_DIR + "/copied2");
       }
-    } finally {
       if (cluster != null) {
         cluster.shutdown();
       }
       if (proxy != null) {
         proxy.stop();
       }
+    } catch (Exception t) {
+      LOG.fatal("caught exception in test", t);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      if (proxy != null) {
+        proxy.stop();
+      }
+      throw t;
     }
   }
 }

+ 16 - 0
src/contrib/index/ivy.xml

@@ -32,6 +32,18 @@
       name="log4j"
       rev="${log4j.version}"
       conf="common->master"/>
+    <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->default"/>
     <dependency org="org.apache.lucene"
       name="lucene-core"
       rev="${lucene-core.version}"
@@ -44,5 +56,9 @@
       name="commons-math"
       rev="${commons-math.version}"
       conf="common->default"/>
+    <dependency org="org.codehaus.jackson"
+      name="jackson-core-asl"
+      rev="${jackson.version}"
+      conf="common->default"/>
     </dependencies>
 </ivy-module>

+ 0 - 1
src/contrib/streaming/ivy/libraries.properties

@@ -4,6 +4,5 @@
 #Please list the dependencies name with version if they are different from the ones 
 #listed in the global libraries.properties file (in alphabetical order)
 
-jackson.version=1.0.1
 commons-configuration.version=1.6
 commons-math.version=2.1

+ 7 - 4
src/core/org/apache/hadoop/fs/FileUtil.java

@@ -22,17 +22,20 @@ import java.io.*;
 import java.util.Enumeration;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.mortbay.log.Log;
 
 /**
  * A collection of file-processing util methods
  */
 public class FileUtil {
+  private static final Log LOG = LogFactory.getLog(FileUtil.class);
+
   /**
    * convert an array of FileStatus to an array of Path
    * 
@@ -598,9 +601,9 @@ public class FileUtil {
     try {
       shExec.execute();
     }catch(IOException e) {
-      if(Log.isDebugEnabled()) {
-        Log.debug("Error while changing permission : " + filename 
-            +" Exception: " + StringUtils.stringifyException(e));
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Error while changing permission : " + filename 
+                  +" Exception: " + StringUtils.stringifyException(e));
       }
     }
     return shExec.getExitCode();

+ 21 - 6
src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java

@@ -93,8 +93,8 @@ public class TrackerDistributedCacheManager {
   
   private static final Random random = new Random();
   
-  BaseDirManager baseDirManager = new BaseDirManager();
-  CleanupThread cleanupThread;
+  protected BaseDirManager baseDirManager = new BaseDirManager();
+  protected CleanupThread cleanupThread;
 
   public TrackerDistributedCacheManager(Configuration conf,
                                         TaskController controller
@@ -874,7 +874,7 @@ public class TrackerDistributedCacheManager {
   /**
    * A thread to check and cleanup the unused files periodically
    */
-  private class CleanupThread extends Thread {
+  protected class CleanupThread extends Thread {
     // How often do we check if we need to clean up cache files?
     private long cleanUpCheckPeriod = 60000L; // 1 minute
     public CleanupThread(Configuration conf) {
@@ -882,6 +882,7 @@ public class TrackerDistributedCacheManager {
         conf.getLong("mapreduce.tasktracker.distributedcache.checkperiod",
             cleanUpCheckPeriod);
     }
+
     private volatile boolean running = true;
     
     public void stopRunning() {
@@ -894,19 +895,33 @@ public class TrackerDistributedCacheManager {
         try {
           Thread.sleep(cleanUpCheckPeriod);
           baseDirManager.checkAndCleanup();
-        } catch (Exception e) {
+        } catch (IOException e) {
           LOG.error("Exception in DistributedCache CleanupThread.", e);
-          // This thread should keep running and never crash.
+        } catch(InterruptedException e) {
+          LOG.info("Cleanup...",e); 
+          //To force us to exit cleanly
+          running = false;
+        } catch (Throwable t) {
+          exitTaskTracker(t);
         }
       }
     }
+    
+    /**
+     * Exit the task tracker because of a fatal error.
+     */
+    protected void exitTaskTracker(Throwable t) {
+      LOG.fatal("Distributed Cache cleanup thread received runtime exception." +
+      		" Exiting the TaskTracker", t);
+      Runtime.getRuntime().exit(-1);
+    }
   }
 
   /**
    * This class holds properties of each base directories and is responsible
    * for clean up unused cache files in base directories.
    */
-  private class BaseDirManager {
+  protected class BaseDirManager {
 
     // For holding the properties of each cache directory
     private class CacheDir {

+ 36 - 4
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -91,6 +91,7 @@ public class JobInProgress {
   JobStatus status;
   String jobFile = null;
   Path localJobFile = null;
+  final QueueMetrics queueMetrics;
 
   TaskInProgress maps[] = new TaskInProgress[0];
   TaskInProgress reduces[] = new TaskInProgress[0];
@@ -339,8 +340,9 @@ public class JobInProgress {
     this.resourceEstimator = new ResourceEstimator(this);
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.status.setUsername(conf.getUser());
+    String queueName = conf.getQueueName();
     this.profile = new JobProfile(conf.getUser(), jobid, "", "",
-                                  conf.getJobName(), conf.getQueueName());
+                                  conf.getJobName(), queueName);
     this.memoryPerMap = conf.getMemoryForMapTask();
     this.memoryPerReduce = conf.getMemoryForReduceTask();
     this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
@@ -377,6 +379,7 @@ public class JobInProgress {
       this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
       this.status.setUsername(jobInfo.getUser().toString());
       this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+      // Add the queue-level metric below (after the profile has been initialized)
       this.startTime = jobtracker.getClock().getTime();
       status.setStartTime(startTime);
       this.localFs = jobtracker.getLocalFileSystem();
@@ -424,9 +427,9 @@ public class JobInProgress {
       
       this.priority = conf.getJobPriority();
       this.status.setJobPriority(this.priority);
+      String queueName = conf.getQueueName();
       this.profile = new JobProfile(user, jobId, 
-          jobFile, url, conf.getJobName(),
-          conf.getQueueName());
+          jobFile, url, conf.getJobName(), queueName);
 
       Queue queue = this.jobtracker.getQueueManager().getQueue(queueName);
       if (queue == null) {
@@ -487,7 +490,15 @@ public class JobInProgress {
       FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     }
   }
-    
+
+  /**
+   * Get the QueueMetrics object associated with this job
+   * @return QueueMetrics
+   */
+  public QueueMetrics getQueueMetrics() {
+    return this.queueMetrics;
+  }
+
   private void checkTaskLimits() throws IOException {
     // if the number of tasks is larger than a configured value
     // then fail the job.
@@ -695,6 +706,8 @@ public class JobInProgress {
 
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
+    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
+    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
 
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
@@ -1695,6 +1708,7 @@ public class JobInProgress {
       if (tip.getActiveTasks().size() > 1)
         speculativeMapTasks++;
       metrics.launchMap(id);
+      this.queueMetrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
@@ -1702,6 +1716,7 @@ public class JobInProgress {
       if (tip.getActiveTasks().size() > 1)
         speculativeReduceTasks++;
       metrics.launchReduce(id);
+      this.queueMetrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
     // restart has already their logs in place.
@@ -1852,9 +1867,11 @@ public class JobInProgress {
     map.put(taskTracker, info);
     if (type == TaskType.MAP) {
       jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+      this.queueMetrics.addReservedMapSlots(reservedSlots);
     }
     else {
       jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+      this.queueMetrics.addReservedReduceSlots(reservedSlots);
     }
     jobtracker.incrementReservations(type, reservedSlots);
   }
@@ -1884,10 +1901,12 @@ public class JobInProgress {
     map.remove(taskTracker);
     if (type == TaskType.MAP) {
       jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+      this.queueMetrics.decReservedMapSlots(info.getNumSlots());
     }
     else {
       jobtracker.getInstrumentation().decReservedReduceSlots(
         info.getNumSlots());
+      this.queueMetrics.decReservedReduceSlots(info.getNumSlots());
     }
     jobtracker.decrementReservations(type, info.getNumSlots());
   }
@@ -2596,6 +2615,7 @@ public class JobInProgress {
       }
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
+      this.queueMetrics.completeMap(taskid);
       // remove the completed map from the resp running caches
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
@@ -2611,6 +2631,7 @@ public class JobInProgress {
       }
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
+      this.queueMetrics.completeReduce(taskid);
       // remove the completed reduces from the running reducers set
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
@@ -2655,14 +2676,18 @@ public class JobInProgress {
     //update the metrics
     if (oldState == JobStatus.PREP) {
       this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+      this.queueMetrics.decPrepJob(conf, jobId);
     } else if (oldState == JobStatus.RUNNING) {
       this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+      this.queueMetrics.decRunningJob(conf, jobId);
     }
     
     if (newState == JobStatus.PREP) {
       this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+      this.queueMetrics.addPrepJob(conf, jobId);
     } else if (newState == JobStatus.RUNNING) {
       this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+      this.queueMetrics.addRunningJob(conf, jobId);
     }
     
   }
@@ -2717,6 +2742,7 @@ public class JobInProgress {
       garbageCollect();
       
       metrics.completeJob(this.conf, this.status.getJobID());
+      this.queueMetrics.completeJob(this.conf, this.status.getJobID());
     }
   }
   
@@ -2757,9 +2783,11 @@ public class JobInProgress {
       if (jobTerminationState == JobStatus.FAILED) {
         jobtracker.getInstrumentation().failedJob(
             this.conf, this.status.getJobID());
+        this.queueMetrics.failedJob(this.conf, this.status.getJobID());
       } else {
         jobtracker.getInstrumentation().killedJob(
             this.conf, this.status.getJobID());
+        this.queueMetrics.killedJob(this.conf, this.status.getJobID());
       }
     }
   }
@@ -2910,9 +2938,11 @@ public class JobInProgress {
         if (tip.isMapTask() && !metricsDone) {
           runningMapTasks -= 1;
           metrics.failedMap(taskid);
+          this.queueMetrics.failedMap(taskid);
         } else if (!metricsDone) {
           runningReduceTasks -= 1;
           metrics.failedReduce(taskid);
+          this.queueMetrics.failedReduce(taskid);
         }
       }
       
@@ -3155,6 +3185,8 @@ public class JobInProgress {
       // Let the JobTracker know that a job is complete
       jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
       jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
+      this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
+      this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
       jobtracker.storeCompletedJob(this);
       jobtracker.finalizeJob(this);
 

+ 2 - 0
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -4028,6 +4028,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       }
     }
     myInstrumentation.submitJob(job.getJobConf(), jobId);
+    job.getQueueMetrics().submitJob(job.getJobConf(), jobId);
+
     LOG.info("Job " + jobId + " added successfully for user '" 
              + job.getJobConf().getUser() + "' to queue '" 
              + job.getJobConf().getQueueName() + "'");

+ 6 - 0
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -61,6 +61,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
   private final TaskController taskController = new DefaultTaskController();
 
   private JobTrackerInstrumentation myMetrics = null;
+  private QueueMetrics queueMetrics = null;
 
   private static final String jobDir =  "localRunner/";
   
@@ -207,8 +208,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
             map.setConf(localConf);
             map_tasks += 1;
             myMetrics.launchMap(mapId);
+            queueMetrics.launchMap(mapId);
             map.run(localConf, this);
             myMetrics.completeMap(mapId);
+            queueMetrics.completeMap(mapId);
             map_tasks -= 1;
             updateCounters(map);
           } else {
@@ -253,8 +256,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
               reduce.setConf(localConf);
               reduce_tasks += 1;
               myMetrics.launchReduce(reduce.getTaskID());
+              queueMetrics.launchReduce(reduce.getTaskID());
               reduce.run(localConf, this);
               myMetrics.completeReduce(reduce.getTaskID());
+              queueMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
             } else {
@@ -413,6 +418,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = JobTrackerInstrumentation.create(null, new JobConf(conf));
+    queueMetrics = QueueMetrics.create(conf.getQueueName(), new JobConf(conf));
     taskController.setConf(conf);
   }
 

+ 12 - 1
src/mapred/org/apache/hadoop/mapred/Queue.java

@@ -37,6 +37,7 @@ class Queue {
   private String name;
   private Map<String,AccessControlList> acls;
   private QueueState state = QueueState.RUNNING;
+  private QueueMetrics queueMetrics;
 
   /**
    * An Object that can be used by schedulers to fill in
@@ -69,10 +70,12 @@ class Queue {
    * @param acls ACLs for the queue
    * @param state state of the queue
    */
-  Queue(String name, Map<String, AccessControlList> acls, QueueState state) {
+  Queue(String name, Map<String, AccessControlList> acls, QueueState state,
+        QueueMetrics metrics) {
 	  this.name = name;
 	  this.acls = acls;
 	  this.state = state;
+	  this.queueMetrics = metrics;
   }
 
   /**
@@ -149,4 +152,12 @@ class Queue {
   void setSchedulingInfo(Object schedulingInfo) {
     this.schedulingInfo = schedulingInfo;
   }
+
+  /**
+   * Return the QueueMetrics object for this queue
+   * @return QueueMetrics
+   */
+  public QueueMetrics getMetrics() {
+    return this.queueMetrics;
+  }
 }

+ 12 - 2
src/mapred/org/apache/hadoop/mapred/QueueManager.java

@@ -118,7 +118,7 @@ class QueueManager {
         LOG.error("The queue, " + name + " does not have a configured ACL list");
       }
       queues.put(name, new Queue(name, getQueueAcls(name, conf),
-          getQueueState(name, conf)));
+          getQueueState(name, conf), QueueMetrics.create(name, conf)));
     }
     
     return queues;
@@ -136,7 +136,17 @@ class QueueManager {
   public synchronized Set<String> getQueues() {
     return queues.keySet();
   }
-  
+
+  /**
+   * Return a specific queue configured in the system.
+   * 
+   * @param queueName Name of the queue requested
+   * @return Queue object corresponding to queueName
+   */
+  public synchronized Queue getQueue(String queueName) {
+    return queues.get(queueName);
+  }
+
   /**
    * Return true if the given user is part of the ACL for the given
    * {@link QueueACL} name for the given queue.

+ 208 - 0
src/mapred/org/apache/hadoop/mapred/QueueMetrics.java

@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ *
+ */
+@SuppressWarnings("deprecation")
+class QueueMetrics implements MetricsSource {
+  private static final Log LOG =
+    LogFactory.getLog(QueueMetrics.class);
+
+  final MetricsRegistry registry = new MetricsRegistry("Queue");
+  final MetricMutableCounterInt mapsLaunched =
+      registry.newCounter("maps_launched", "", 0);
+  final MetricMutableCounterInt mapsCompleted =
+      registry.newCounter("maps_completed", "", 0);
+  final MetricMutableCounterInt mapsFailed =
+      registry.newCounter("maps_failed", "", 0);
+  final MetricMutableCounterInt redsLaunched =
+      registry.newCounter("reduces_launched", "", 0);
+  final MetricMutableCounterInt redsCompleted =
+      registry.newCounter("reduces_completed", "", 0);
+  final MetricMutableCounterInt redsFailed =
+      registry.newCounter("reduces_failed", "", 0);
+  final MetricMutableCounterInt jobsSubmitted =
+      registry.newCounter("jobs_submitted", "", 0);
+  final MetricMutableCounterInt jobsCompleted =
+      registry.newCounter("jobs_completed", "", 0);
+  final MetricMutableGaugeInt waitingMaps =
+      registry.newGauge("waiting_maps", "", 0);
+  final MetricMutableGaugeInt waitingReds =
+      registry.newGauge("waiting_reduces", "", 0);
+  final MetricMutableGaugeInt reservedMapSlots =
+      registry.newGauge("reserved_map_slots", "", 0);
+  final MetricMutableGaugeInt reservedRedSlots =
+      registry.newGauge("reserved_reduce_slots", "", 0);
+  final MetricMutableCounterInt jobsFailed =
+      registry.newCounter("jobs_failed", "", 0);
+  final MetricMutableCounterInt jobsKilled =
+      registry.newCounter("jobs_killed", "", 0);
+  final MetricMutableGaugeInt jobsPreparing =
+      registry.newGauge("jobs_preparing", "", 0);
+  final MetricMutableGaugeInt jobsRunning =
+      registry.newGauge("jobs_running", "", 0);
+  final MetricMutableCounterInt mapsKilled =
+      registry.newCounter("maps_killed", "", 0);
+  final MetricMutableCounterInt redsKilled =
+      registry.newCounter("reduces_killed", "", 0);
+
+  final String sessionId;
+  private String queueName;
+
+  public QueueMetrics(String queueName, Configuration conf) {
+    this.queueName = queueName;
+    sessionId = conf.get("session.id", "");
+    registry.setContext("mapred").tag("sessionId", "", sessionId);
+    registry.tag("Queue", "Metrics by queue", queueName);
+  }
+
+  public String getQueueName() {
+    return this.queueName;
+  }
+
+  public void getMetrics(MetricsBuilder builder, boolean all) {
+    registry.snapshot(builder.addRecord(registry.name()), all);
+  }
+
+  public void launchMap(TaskAttemptID taskAttemptID) {
+    mapsLaunched.incr();
+    decWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public void completeMap(TaskAttemptID taskAttemptID) {
+    mapsCompleted.incr();
+  }
+
+  public void failedMap(TaskAttemptID taskAttemptID) {
+    mapsFailed.incr();
+    addWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public void launchReduce(TaskAttemptID taskAttemptID) {
+    redsLaunched.incr();
+    decWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public void completeReduce(TaskAttemptID taskAttemptID) {
+    redsCompleted.incr();
+  }
+
+  public void failedReduce(TaskAttemptID taskAttemptID) {
+    redsFailed.incr();
+    addWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public void submitJob(JobConf conf, JobID id) {
+    jobsSubmitted.incr();
+  }
+
+  public void completeJob(JobConf conf, JobID id) {
+    jobsCompleted.incr();
+  }
+
+  public void addWaitingMaps(JobID id, int task) {
+    waitingMaps.incr(task);
+  }
+
+  public void decWaitingMaps(JobID id, int task) {
+    waitingMaps.decr(task);
+  }
+
+  public void addWaitingReduces(JobID id, int task) {
+    waitingReds.incr(task);
+  }
+
+  public void decWaitingReduces(JobID id, int task){
+    waitingReds.decr(task);
+  }
+
+  public void addReservedMapSlots(int slots) {
+    reservedMapSlots.incr(slots);;
+  }
+
+  public void decReservedMapSlots(int slots) {
+    reservedMapSlots.decr(slots);
+  }
+
+  public void addReservedReduceSlots(int slots) {
+    reservedRedSlots.incr(slots);
+  }
+
+  public void decReservedReduceSlots(int slots) {
+    reservedRedSlots.decr(slots);
+  }
+
+  public void failedJob(JobConf conf, JobID id) {
+    jobsFailed.incr();
+  }
+
+  public void killedJob(JobConf conf, JobID id) {
+    jobsKilled.incr();
+  }
+
+  public void addPrepJob(JobConf conf, JobID id) {
+    jobsPreparing.incr();
+  }
+
+  public void decPrepJob(JobConf conf, JobID id) {
+    jobsPreparing.decr();
+  }
+
+  public void addRunningJob(JobConf conf, JobID id) {
+    jobsRunning.incr();
+  }
+
+  public void decRunningJob(JobConf conf, JobID id) {
+    jobsRunning.decr();
+  }
+
+  public void killedMap(TaskAttemptID taskAttemptID) {
+    mapsKilled.incr();
+  }
+
+  public void killedReduce(TaskAttemptID taskAttemptID) {
+    redsKilled.incr();
+  }
+
+  static QueueMetrics create(String queueName, Configuration conf) {
+    return create(queueName, conf, DefaultMetricsSystem.INSTANCE);
+  }
+
+  static QueueMetrics create(String queueName, Configuration conf,
+                                     MetricsSystem ms) {
+    return ms.register("QueueMetrics,q=" + queueName, "Queue metrics",
+                       new QueueMetrics(queueName, conf));
+  }
+
+}

+ 146 - 0
src/test/org/apache/hadoop/mapred/TestQueueMetrics.java

@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.jasper.tagplugins.jstl.core.When;
+import org.mockito.Mockito;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("deprecation")
+public class TestQueueMetrics extends TestCase {
+
+  QueueMetrics metrics = Mockito.mock(QueueMetrics.class);
+  static int jobIdCounter = 0;
+  static final String jtIdentifier = "queue_jt";
+
+  private static JobID getJobId() {
+    return new JobID(TestQueueMetrics.jtIdentifier, jobIdCounter++);
+  }
+
+  public void testDefaultSingleQueueMetrics() {
+    String queueName = "single";
+    TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
+    when(taskAttemptID.getJobID()).thenReturn(TestQueueMetrics.getJobId());
+
+    QueueMetrics metrics = QueueMetrics.create(queueName, new Configuration());
+
+    assertEquals(metrics.getQueueName(), "single");
+    metrics.launchMap(taskAttemptID);
+    checkMaps(metrics, 1, 0, 0, 0, -1, 0);
+    metrics.addWaitingMaps(taskAttemptID.getJobID(), 5);
+    metrics.launchMap(taskAttemptID);
+    checkMaps(metrics, 2, 0, 0, 0, 3, 0);
+    checkReduces(metrics, 0, 0, 0, 0, 0, 0);
+
+    metrics.completeMap(taskAttemptID);
+    metrics.failedMap(taskAttemptID);
+    checkMaps(metrics, 2, 1, 1, 0, 4, 0);
+    checkReduces(metrics, 0, 0, 0, 0, 0, 0);
+
+    metrics.launchReduce(taskAttemptID);
+    metrics.completeReduce(taskAttemptID);
+    metrics.failedReduce(taskAttemptID);
+    checkMaps(metrics, 2, 1, 1, 0, 4, 0);
+    checkReduces(metrics, 1, 1, 1, 0, 0, 0);
+
+    metrics.addWaitingMaps(null, 20);
+    metrics.decWaitingMaps(null, 10);
+    metrics.addWaitingReduces(null, 20);
+    metrics.decWaitingReduces(null, 10);
+    checkMaps(metrics, 2, 1, 1, 0, 14, 0);
+    checkReduces(metrics, 1, 1, 1, 0, 10, 0);
+
+    metrics.addReservedMapSlots(10);
+    metrics.addReservedReduceSlots(10);
+    checkMaps(metrics, 2, 1, 1, 0, 14, 10);
+    checkReduces(metrics, 1, 1, 1, 0, 10, 10);
+    metrics.decReservedReduceSlots(5);
+    metrics.decReservedMapSlots(5);
+    checkMaps(metrics, 2, 1, 1, 0, 14, 5);
+    checkReduces(metrics, 1, 1, 1, 0, 10, 5);
+
+    metrics.killedMap(taskAttemptID);
+    metrics.killedReduce(taskAttemptID);
+    checkMaps(metrics, 2, 1, 1, 1, 14, 5);
+    checkReduces(metrics, 1, 1, 1, 1, 10, 5);
+    checkJobs(metrics, 0, 0, 0, 0, 0, 0);  
+
+    metrics.submitJob(null, null);
+    metrics.completeJob(null, null);
+    metrics.failedJob(null, null);
+    metrics.killedJob(null, null);
+    checkJobs(metrics, 1, 1, 1, 1, 0, 0);
+
+    metrics.addPrepJob(null, null);
+    metrics.addRunningJob(null, null);
+    metrics.addPrepJob(null, null);
+    metrics.addRunningJob(null, null);
+    checkJobs(metrics, 1, 1, 1, 1, 2, 2);
+    metrics.decPrepJob(null, null);
+    metrics.decRunningJob(null, null);
+    checkJobs(metrics, 1, 1, 1, 1, 1, 1);
+    checkMaps(metrics, 2, 1, 1, 1, 14, 5);
+    checkReduces(metrics, 1, 1, 1, 1, 10, 5);
+  }
+
+  public static void checkMaps(QueueMetrics metrics,
+      int maps_launched, int maps_completed, int maps_failed, int maps_killed,
+      int waiting_maps, int reserved_map_slots) {
+    assertCounter("maps_launched", maps_launched, metrics);
+    assertCounter("maps_completed", maps_completed, metrics);
+    assertCounter("maps_failed", maps_failed, metrics);
+    assertCounter("maps_killed", maps_killed, metrics);
+    assertGauge("waiting_maps", waiting_maps, metrics);
+    assertGauge("reserved_map_slots", reserved_map_slots, metrics);
+  }
+
+  public static void checkReduces(QueueMetrics metrics,
+      int reduces_launched, int reduces_completed, int reduces_failed,
+      int reduces_killed, int waiting_reduces, int reserved_reduce_slots) {
+    assertCounter("reduces_launched", reduces_launched, metrics);
+    assertCounter("reduces_completed", reduces_completed, metrics);
+    assertCounter("reduces_failed", reduces_failed, metrics);
+    assertCounter("reduces_killed", reduces_killed, metrics);
+    assertGauge("waiting_reduces", waiting_reduces, metrics);
+    assertGauge("reserved_reduce_slots", reserved_reduce_slots, metrics);
+  }
+
+  public static void checkJobs(QueueMetrics metrics, int jobs_submitted, int jobs_completed,
+      int jobs_failed, int jobs_killed, int jobs_preparing, int jobs_running) {
+    assertCounter("jobs_submitted", jobs_submitted, metrics);
+    assertCounter("jobs_completed", jobs_completed, metrics);
+    assertCounter("jobs_failed", jobs_failed, metrics);
+    assertCounter("jobs_killed", jobs_killed, metrics);
+    assertGauge("jobs_preparing", jobs_preparing, metrics);
+    assertGauge("jobs_running", jobs_running, metrics);    
+  }
+}