Bladeren bron

HADOOP-4035. Support memory based scheduling in capacity scheduler. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@722760 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 jaren geleden
bovenliggende
commit
353bd05c0e
28 gewijzigde bestanden met toevoegingen van 2290 en 501 verwijderingen
  1. 3 0
      CHANGES.txt
  2. 31 0
      conf/capacity-scheduler.xml.template
  3. 137 0
      conf/hadoop-default.xml
  4. 68 0
      src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
  5. 266 57
      src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
  6. 2 1
      src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
  7. 246 0
      src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
  8. 421 27
      src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
  9. 12 2
      src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
  10. 132 0
      src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
  11. 74 0
      src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
  12. 2 2
      src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
  13. 4 1
      src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
  14. 146 22
      src/mapred/org/apache/hadoop/mapred/JobConf.java
  15. 19 3
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  16. 4 1
      src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java
  17. 6 1
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  18. 1 1
      src/mapred/org/apache/hadoop/mapred/JvmManager.java
  19. 17 8
      src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  20. 241 76
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  21. 18 1
      src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
  22. 79 35
      src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
  23. 49 0
      src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java
  24. 0 242
      src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java
  25. 12 2
      src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
  26. 241 0
      src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
  27. 57 17
      src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java
  28. 2 2
      src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

+ 3 - 0
CHANGES.txt

@@ -44,6 +44,9 @@ Trunk (unreleased changes)
     HADOOP-4422. S3 file systems should not create bucket.
     (David Phillips via tomwhite)
 
+    HADOOP-4035. Support memory based scheduling in capacity scheduler.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
   NEW FEATURES
 
     HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.

+ 31 - 0
conf/capacity-scheduler.xml.template

@@ -73,6 +73,37 @@
       account in scheduling decisions by default in a job queue.
     </description>
   </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem</name>
+    <value>-1</value>
+    <description>If mapred.task.maxpmem is set to -1, this configuration will
+      be used to calculate job's physical memory requirements as a percentage of
+      the job's virtual memory requirements set via mapred.task.maxvmem. This
+      property thus provides default value of physical memory for job's that
+      don't explicitly specify physical memory requirements.
+
+      If not explicitly set to a valid value, scheduler will not consider
+      physical memory for scheduling even if virtual memory based scheduling is
+      enabled(by setting valid values for both mapred.task.default.maxvmem and
+      mapred.task.limit.maxvmem).
+    </description>
+  </property>
+
+  <property>
+    <name>mapred.capacity-scheduler.task.limit.maxpmem</name>
+    <value>-1</value>
+    <description>Configuration that provides an upper limit on the maximum
+      physical memory that can be specified by a job. The job configuration
+      mapred.task.maxpmem should be less than this value. If not, the job will
+      be rejected by the scheduler.
+      
+      If it is set to -1, scheduler will not consider physical memory for
+      scheduling even if virtual memory based scheduling is enabled(by setting
+      valid values for both mapred.task.default.maxvmem and
+      mapred.task.limit.maxvmem).
+    </description>
+  </property>
   
   <property>
     <name>mapred.capacity-scheduler.default-minimum-user-limit-percent</name>

+ 137 - 0
conf/hadoop-default.xml

@@ -739,6 +739,143 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.tasktracker.vmem.reserved</name>
+  <value>-1</value>
+  <description>Configuration property to specify the amount of virtual memory
+    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
+    The reserved virtual memory should be a part of the total virtual memory
+    available on the TaskTracker.
+    
+    The reserved virtual memory and the total virtual memory values are
+    reported by the TaskTracker as part of heart-beat so that they can
+    considered by a scheduler. Please refer to the documentation of the
+    configured scheduler to see how this property is used.
+    
+    These two values are also used by a TaskTracker for tracking tasks' memory
+    usage. Memory management functionality on a TaskTracker is disabled if this
+    property is set to -1, if it more than the total virtual memory on the 
+    tasktracker, or if either of the values is negative.
+  </description>
+</property>
+
+<property>
+  <name>mapred.tasktracker.pmem.reserved</name>
+  <value>-1</value>
+  <description>Configuration property to specify the amount of physical memory
+    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
+    The reserved physical memory should be a part of the total physical memory
+    available on the TaskTracker.
+
+    The reserved physical memory and the total physical memory values are
+    reported by the TaskTracker as part of heart-beat so that they can
+    considered by a scheduler. Please refer to the documentation of the
+    configured scheduler to see how this property is used.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.default.maxvmem</name>
+  <value>-1</value>
+  <description>
+    Cluster-wide configuration in bytes to be set by the administrators that
+    provides default amount of maximum virtual memory for job's tasks. This has
+    to be set on both the JobTracker node for the sake of scheduling decisions
+    and on the TaskTracker nodes for the sake of memory management.
+
+    If a job doesn't specify its virtual memory requirement by setting
+    mapred.task.maxvmem to -1, tasks are assured a memory limit set
+    to this property. This property is set to -1 by default.
+
+    This value should in general be less than the cluster-wide
+    configuration mapred.task.limit.maxvmem. If not or if it is not set,
+    TaskTracker's memory management will be disabled and a scheduler's memory
+    based scheduling decisions may be affected. Please refer to the
+    documentation of the configured scheduler to see how this property is used.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.limit.maxvmem</name>
+  <value>-1</value>
+  <description>
+    Cluster-wide configuration in bytes to be set by the site administrators
+    that provides an upper limit on the maximum virtual memory that can be
+    specified by a job via mapred.task.maxvmem. This has to be set on both the
+    JobTracker node for the sake of scheduling decisions and on the TaskTracker
+    nodes for the sake of memory management.
+    
+    The job configuration mapred.task.maxvmem should not be more than this
+    value, otherwise depending on the scheduler being configured, the job may
+    be rejected or the job configuration may just be ignored. Please refer to
+    the documentation of the configured scheduler to see how this property is
+    used.
+
+    If it is not set a TaskTracker, TaskTracker's memory management will be
+    disabled.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.maxvmem</name>
+  <value>-1</value>
+  <description>
+    The maximum amount of virtual memory any task of a job will use, in bytes.
+
+    This value will be used by TaskTrackers for monitoring the memory usage of
+    tasks of this jobs. If a TaskTracker's memory management functionality is
+    enabled, each task of this job will be allowed to use a maximum virtual
+    memory specified by this property. If the task's memory usage goes over 
+    this value, the task will be failed by the TT. If not set, the
+    cluster-wide configuration mapred.task.default.maxvmem is used as the
+    default value for memory requirements. If this property cascaded with
+    mapred.task.default.maxvmem becomes equal to -1, the job's tasks will
+    not be assured any particular amount of virtual memory and may be killed by
+    a TT that intends to control the total memory usage of the tasks via memory
+    management functionality. If the memory management functionality is
+    disabled on a TT, this value is ignored.
+
+    This value should not be more than the cluster-wide configuration
+    mapred.task.limit.maxvmem.
+
+    This value may be used by schedulers that support scheduling based on job's
+    memory requirements. Please refer to the documentation of the scheduler
+    being configured to see if it does memory based scheduling and if it does,
+    how this property is used by that scheduler.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.maxpmem</name>name>
+  <value>-1</value>
+  <description>
+   The maximum amount of physical memory any task of a job will use in bytes.
+
+   This value may be used by schedulers that support scheduling based on job's
+   memory requirements. In general, a task of this job will be scheduled on a
+   TaskTracker, only if the amount of physical memory still unoccupied on the
+   TaskTracker is greater than or equal to this value. Different schedulers can
+   take different decisions, some might just ignore this value. Please refer to
+   the documentation of the scheduler being configured to see if it does
+   memory based scheduling and if it does, how this variable is used by that
+   scheduler.
+  </description>
+</property>
+
+<property>
+  <name>mapred.tasktracker.memory_calculator_plugin</name>
+  <value></value>
+  <description>
+   Name of the class whose instance will be used to query memory information
+   on the tasktracker.
+   
+   The class must be an instance of 
+   org.apache.hadoop.util.MemoryCalculatorPlugin. If the value is null, the
+   tasktracker attempts to use a class appropriate to the platform. 
+   Currently, the only platform supported is Linux.
+  </description>
+</property>
+
 <property>
   <name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name>
   <value>5000</value>

+ 68 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -43,6 +43,34 @@ class CapacitySchedulerConf {
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
     "mapred.capacity-scheduler.queue.";
 
+  /**
+   * If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
+   * {@link JobConf#DISABLED_MEMORY_LIMIT}, this configuration will be used to
+   * calculate job's physical memory requirements as a percentage of the job's
+   * virtual memory requirements set via
+   * {@link JobConf#setMaxVirtualMemoryForTask()}. This property thus provides
+   * default value of physical memory for job's that don't explicitly specify
+   * physical memory requirements.
+   * 
+   * It defaults to {@link JobConf#DISABLED_MEMORY_LIMIT} and if not explicitly
+   * set to a valid value, scheduler will not consider physical memory for
+   * scheduling even if virtual memory based scheduling is enabled.
+   */
+  static String DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY =
+      "mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem";
+
+  /**
+   * Configuration that provides an upper limit on the maximum physical memory
+   * that can be specified by a job. The job configuration
+   * {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} should,
+   * by definition, be less than this value. If not, the job will be rejected
+   * by the scheduler. If it is set to {@link JobConf#DISABLED_MEMORY_LIMIT},
+   * scheduler will not consider physical memory for scheduling even if virtual
+   * memory based scheduling is enabled.
+   */
+  static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
+      "mapred.capacity-scheduler.task.limit.maxpmem";
+
   private Configuration rmConf;
 
   private int defaultMaxJobsPerUsersToInitialize;
@@ -348,4 +376,44 @@ class CapacitySchedulerConf {
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
+
+  /**
+   * Get the upper limit on the maximum physical memory that can be specified by
+   * a job.
+   * 
+   * @return upper limit for max pmem for tasks.
+   */
+  public long getLimitMaxPmemForTasks() {
+    return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
+        JobConf.DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Get the upper limit on the maximum physical memory that can be specified by
+   * a job.
+   * 
+   * @param value
+   */
+  public void setLimitMaxPmemForTasks(long value) {
+    rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
+  }
+
+  /**
+   * Get cluster-wide default percentage of pmem in vmem.
+   * 
+   * @return cluster-wide default percentage of pmem in vmem.
+   */
+  public float getDefaultPercentOfPmemInVmem() {
+    return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
+        JobConf.DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Set cluster-wide default percentage of pmem in vmem.
+   * 
+   * @param value
+   */
+  public void setDefaultPercentOfPmemInVmem(float value) {
+    rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
+  }
 }

+ 266 - 57
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -229,8 +229,40 @@ class CapacityTaskScheduler extends TaskScheduler {
       return sb.toString();
     }
   }
-  
-  
+
+  private static enum TaskLookUpStatus {
+    TASK_FOUND,
+    NO_TASK_IN_JOB,
+    NO_TASK_IN_QUEUE,
+    NO_TASK_MATCHING_MEMORY_REQUIREMENTS,
+  }
+
+  private static class TaskLookupResult {
+
+    private Task task;
+    private String lookupStatusInfo;
+
+    private TaskLookUpStatus lookUpStatus;
+
+    TaskLookupResult(Task t, TaskLookUpStatus lUStatus, String statusInfo) {
+      this.task = t;
+      this.lookUpStatus = lUStatus;
+      this.lookupStatusInfo = statusInfo;
+    }
+
+    Task getTask() {
+      return task;
+    }
+
+    TaskLookUpStatus getLookUpStatus() {
+      return lookUpStatus;
+    }
+
+    String getLookupStatusInfo() {
+      return lookupStatusInfo;
+    }
+  }
+
   /** 
    * This class handles the scheduling algorithms. 
    * The algos are the same for both Map and Reduce tasks. 
@@ -247,7 +279,11 @@ class CapacityTaskScheduler extends TaskScheduler {
     /** our enclosing TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
     // for debugging
-    protected String type = null;
+    protected static enum TYPE {
+      MAP, REDUCE
+    }
+
+    protected TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
@@ -538,7 +574,6 @@ class CapacityTaskScheduler extends TaskScheduler {
       }
     }
 
-    
     void jobAdded(JobInProgress job) {
       // update qsi 
       QueueSchedulingInfo qsi = 
@@ -558,6 +593,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
                 + job.getProfile().getUser() + ", user now has " + i + " jobs");
     }
+
     void jobRemoved(JobInProgress job) {
       // update qsi 
       QueueSchedulingInfo qsi = 
@@ -627,61 +663,133 @@ class CapacityTaskScheduler extends TaskScheduler {
         return false;
       }
     }
-    
-    private Task getTaskFromQueue(TaskTrackerStatus taskTracker, 
-        QueueSchedulingInfo qsi) throws IOException {
-      Task t = null;
+
+    private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
+        QueueSchedulingInfo qsi)
+        throws IOException {
+
       // keep track of users over limit
       Set<String> usersOverLimit = new HashSet<String>();
-      // look at running jobs first
-      for (JobInProgress j:
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        // some jobs may be in the running queue but may have completed 
+
+      // Look at running jobs first, skipping jobs of those users who are over
+      // their limits
+      TaskLookupResult result =
+          getTaskFromRunningJobQueue(taskTracker, qsi, usersOverLimit, true);
+      TaskLookUpStatus lookUpStatus = result.getLookUpStatus();
+      if (lookUpStatus == TaskLookUpStatus.TASK_FOUND
+          || lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
+        // No need for looking elsewhere
+        return result;
+      }
+
+      // if we're here, we haven't found anything. This could be because
+      // there is nothing to run, or that the user limit for some user is
+      // too strict, i.e., there's at least one user who doesn't have
+      // enough tasks to satisfy his limit. If it's the later case, look at
+      // jobs without considering user limits, and get task from first
+      // eligible job
+      if (usersOverLimit.size() > 0) {
+        // look at running jobs, considering users over limit
+        result =
+            getTaskFromRunningJobQueue(taskTracker, qsi, usersOverLimit, false);
+        lookUpStatus = result.getLookUpStatus();
+        if (lookUpStatus == TaskLookUpStatus.TASK_FOUND
+            || lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
+          // No need for looking elsewhere
+          return result;
+        }
+      }
+
+      // found nothing for this queue, look at the next one.
+      String msg = "Found no task from the queue" + qsi.queueName;
+      LOG.info(msg);
+      return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
+          msg);
+    }
+
+    // get a task from the running queue
+    private TaskLookupResult getTaskFromRunningJobQueue(
+        TaskTrackerStatus taskTracker, QueueSchedulingInfo qsi,
+        Set<String> usersOverLimit, boolean skipUsersOverLimit)
+        throws IOException {
+
+      for (JobInProgress j : scheduler.jobQueuesManager
+          .getRunningJobQueue(qsi.queueName)) {
+        // some jobs may be in the running queue but may have completed
         // and not yet have been removed from the running queue
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        // is this job's user over limit?
-        if (isUserOverLimit(j, qsi)) {
-          // user over limit. 
-          usersOverLimit.add(j.getProfile().getUser());
-          continue;
+
+        if (skipUsersOverLimit) {
+          // consider jobs of only those users who are under limits
+          if (isUserOverLimit(j, qsi)) {
+            usersOverLimit.add(j.getProfile().getUser());
+            continue;
+          }
+        } else {
+          // consider jobs of only those users who are over limit
+          if (!usersOverLimit.contains(j.getProfile().getUser())) {
+            continue;
+          }
         }
-        // We found a suitable job. Get task from it.
-        t = obtainNewTask(taskTracker, j);
-        if (t != null) {
-          LOG.debug("Got task from job " + 
-                    j.getJobID() + " in queue " + qsi.queueName);
-          return t;
+
+        // We found a suitable job. Try getting a task from it.
+        TaskLookupResult tlr = getTaskFromJob(j, taskTracker, qsi);
+        TaskLookUpStatus lookUpStatus = tlr.getLookUpStatus();
+        if (lookUpStatus == TaskLookUpStatus.NO_TASK_IN_JOB) {
+          // Go to the next job in the same queue.
+          continue;
+        } else if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS
+            || lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
+          // No need for considering the next jobs in this queue.
+          return tlr;
         }
       }
-      
 
-      
-      // if we're here, we haven't found anything. This could be because 
-      // there is nothing to run, or that the user limit for some user is 
-      // too strict, i.e., there's at least one user who doesn't have
-      // enough tasks to satisfy his limit. If it's the later case, look at 
-      // jobs without considering user limits, and get task from first 
-      // eligible job
-      if (usersOverLimit.size() > 0) {
-        for (JobInProgress j:
-          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-          if ((j.getStatus().getRunState() == JobStatus.RUNNING) && 
-              (usersOverLimit.contains(j.getProfile().getUser()))) {
-            t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              LOG.debug("Getting task from job " + 
-                        j.getJobID() + " in queue " + qsi.queueName);
-              return t;
-            }
+      String msg =
+          qsi.queueName + " queue's running jobs queue don't have "
+              + "any more tasks to run.";
+      LOG.info(msg);
+      return new TaskLookupResult(null,
+          TaskLookUpStatus.NO_TASK_IN_QUEUE, msg);
+    }
+
+    private TaskLookupResult getTaskFromJob(JobInProgress j,
+        TaskTrackerStatus taskTracker, QueueSchedulingInfo qsi)
+        throws IOException {
+      String msg;
+
+      if (getPendingTasks(j) != 0) {
+        // Not accurate TODO:
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTracker, j);
+          if (t != null) {
+            msg =
+                "Got task from job " + j.getJobID() + " in queue "
+                    + qsi.queueName;
+            LOG.debug(msg);
+            return new TaskLookupResult(t, TaskLookUpStatus.TASK_FOUND, msg);
           }
+        } else {
+          // block the cluster, till this job's tasks can be scheduled.
+          msg =
+              j.getJobID() + "'s tasks don't fit on the TaskTracker "
+                  + taskTracker.trackerName
+                  + ". Returning no task to the taskTracker";
+          LOG.info(msg);
+          return new TaskLookupResult(null,
+              TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS, msg);
         }
       }
-      
-      return null;
+
+      msg = j.getJobID() + " doesn't have any more tasks to run.";
+      LOG.debug(msg);
+      return new TaskLookupResult(null,
+          TaskLookUpStatus.NO_TASK_IN_JOB, msg);
     }
-    
+
     private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException {
       Task t = null;
 
@@ -692,7 +800,7 @@ class CapacityTaskScheduler extends TaskScheduler {
        * becomes expensive, do it once every few hearbeats only.
        */ 
       updateQSIObjects();
-      LOG.debug("After updating QSI objects:");
+      LOG.debug("After updating QSI objects in " + this.type + " scheduler :");
       printQSIs();
       /*
        * sort list of qeues first, as we want queues that need the most to
@@ -700,7 +808,7 @@ class CapacityTaskScheduler extends TaskScheduler {
        * We're only sorting a collection of queues - there shouldn't be many.
        */
       updateCollectionOfQSIs();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         if (qsi.guaranteedCapacity <= 0.0f) {
           // No capacity is guaranteed yet for this queue.
           // Queues are sorted so that ones without capacities
@@ -708,13 +816,32 @@ class CapacityTaskScheduler extends TaskScheduler {
           // from here without considering any further queues.
           return null;
         }
-        t = getTaskFromQueue(taskTracker, qsi);
-        if (t!= null) {
+
+        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
+        TaskLookUpStatus lookUpStatus = tlr.getLookUpStatus();
+
+        if (lookUpStatus == TaskLookUpStatus.NO_TASK_IN_QUEUE) {
+          continue; // Look in other queues.
+        }
+
+        if (lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
+          t = tlr.getTask();
           // we have a task. Update reclaimed resource info
           updateReclaimedResources(qsi);
           return Collections.singletonList(t);
         }
-      }        
+        
+        if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
+          // blocking the cluster.
+          String msg = tlr.getLookupStatusInfo();
+          if (msg != null) {
+            LOG.warn(msg);
+            LOG.warn("Returning nothing to the Tasktracker "
+                + taskTracker.trackerName);
+            return null;
+          }
+        }
+      }
 
       // nothing to give
       return null;
@@ -745,7 +872,7 @@ class CapacityTaskScheduler extends TaskScheduler {
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
     MapSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = new String("map");
+      type = TaskSchedulingMgr.TYPE.MAP;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
@@ -822,7 +949,7 @@ class CapacityTaskScheduler extends TaskScheduler {
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
     ReduceSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = new String("reduce");
+      type = TaskSchedulingMgr.TYPE.REDUCE;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
     throws IOException {
@@ -870,7 +997,9 @@ class CapacityTaskScheduler extends TaskScheduler {
   /** the scheduling mgrs for Map and Reduce tasks */ 
   protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
   protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
-  
+
+  MemoryMatcher memoryMatcher = new MemoryMatcher(this);
+
   /** name of the default queue. */ 
   static final String DEFAULT_QUEUE_NAME = "default";
   
@@ -880,7 +1009,7 @@ class CapacityTaskScheduler extends TaskScheduler {
    * heartbeats left. */
   private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
 
-  private static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
+  static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
   protected CapacitySchedulerConf rmConf;
   /** whether scheduler has started or not */
@@ -924,6 +1053,10 @@ class CapacityTaskScheduler extends TaskScheduler {
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
+  long limitMaxVmemForTasks;
+  long limitMaxPmemForTasks;
+  long defaultMaxVmPerTask;
+  float defaultPercentOfPmemInVmem;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -939,7 +1072,40 @@ class CapacityTaskScheduler extends TaskScheduler {
   public void setResourceManagerConf(CapacitySchedulerConf conf) {
     this.rmConf = conf;
   }
-  
+
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized value
+   */
+  private long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
+  private void initializeMemoryRelatedConf() {
+    limitMaxVmemForTasks =
+        normalizeMemoryConfigValue(conf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    limitMaxPmemForTasks =
+        normalizeMemoryConfigValue(rmConf.getLimitMaxPmemForTasks());
+
+    defaultMaxVmPerTask =
+        normalizeMemoryConfigValue(conf.getLong(
+            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    defaultPercentOfPmemInVmem = rmConf.getDefaultPercentOfPmemInVmem();
+    if (defaultPercentOfPmemInVmem < 0) {
+      defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+  }
+
   @Override
   public synchronized void start() throws IOException {
     if (started) return;
@@ -947,10 +1113,14 @@ class CapacityTaskScheduler extends TaskScheduler {
     RECLAIM_CAPACITY_INTERVAL = 
       conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
     RECLAIM_CAPACITY_INTERVAL *= 1000;
+
     // initialize our queues from the config settings
     if (null == rmConf) {
       rmConf = new CapacitySchedulerConf();
     }
+
+    initializeMemoryRelatedConf();
+
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
     Set<String> queues = queueManager.getQueues();
@@ -1118,13 +1288,52 @@ class CapacityTaskScheduler extends TaskScheduler {
     }
     return tasks;
   }
-  
+
+  /**
+   * Kill the job if it has invalid requirements and return why it is killed
+   * 
+   * @param job
+   * @return string mentioning why the job is killed. Null if the job has valid
+   *         requirements.
+   */
+  private String killJobIfInvalidRequirements(JobInProgress job) {
+    if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
+      return null;
+    }
+    if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
+        || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
+            .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
+      String msg =
+          job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
+              + job.getMaxPhysicalMemoryForTask()
+              + "pmem) exceeds the cluster's max-memory-limits ("
+              + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
+              + "pmem). Cannot run in this cluster, so killing it.";
+      LOG.warn(msg);
+      try {
+        taskTrackerManager.killJob(job.getJobID());
+        return msg;
+      } catch (IOException ioe) {
+        LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
+            + StringUtils.stringifyException(ioe));
+      }
+    }
+    return null;
+  }
+
   // called when a job is added
-  synchronized void jobAdded(JobInProgress job) {
+  synchronized void jobAdded(JobInProgress job) throws IOException {
     // let our map and reduce schedulers know this, so they can update 
     // user-specific info
     mapScheduler.jobAdded(job);
     reduceScheduler.jobAdded(job);
+
+    // Kill the job if it cannot run in the cluster because of invalid
+    // resource requirements.
+    String statusMsg = killJobIfInvalidRequirements(job);
+    if (statusMsg != null) {
+      throw new IOException(statusMsg);
+    }
   }
 
   // called when a job completes

+ 2 - 1
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -158,7 +159,7 @@ class JobQueuesManager extends JobInProgressListener {
   }
   
   @Override
-  public void jobAdded(JobInProgress job) {
+  public void jobAdded(JobInProgress job) throws IOException {
     LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
     // add job to the right queue
     QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());

+ 246 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java

@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class MemoryMatcher {
+
+  private static final Log LOG = LogFactory.getLog(MemoryMatcher.class);
+  private CapacityTaskScheduler scheduler;
+
+  public MemoryMatcher(CapacityTaskScheduler capacityTaskScheduler) {
+    this.scheduler = capacityTaskScheduler;
+  }
+
+  boolean isSchedulingBasedOnVmemEnabled() {
+    LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
+        + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
+    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  boolean isSchedulingBasedOnPmemEnabled() {
+    LOG.debug("defaultPercentOfPmemInVmem : "
+        + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
+        + scheduler.limitMaxPmemForTasks);
+    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Obtain the virtual memory allocated for a job's tasks.
+   * 
+   * If the job has a configured value for the max-virtual memory, that will be
+   * returned. Else, the cluster-wide default max-virtual memory for tasks is
+   * returned.
+   * 
+   * This method can only be called after
+   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
+   * 
+   * @param jConf JobConf of the job
+   * @return the virtual memory allocated for the job's tasks.
+   */
+  private long getVirtualMemoryForTask(JobConf jConf) {
+    long vMemForTask = jConf.getMaxVirtualMemoryForTask();
+    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      vMemForTask =
+          new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+              scheduler.defaultMaxVmPerTask);
+    }
+    return vMemForTask;
+  }
+
+  /**
+   * Obtain the physical memory allocated for a job's tasks.
+   * 
+   * If the job has a configured value for the max physical memory, that
+   * will be returned. Else, the cluster-wide default physical memory for
+   * tasks is returned.
+   * 
+   * This method can only be called after
+   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
+   * 
+   * @param jConf JobConf of the job
+   * @return the physical memory allocated for the job's tasks
+   */
+  private long getPhysicalMemoryForTask(JobConf jConf) {
+    long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
+    if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      pMemForTask =
+          Math.round(getVirtualMemoryForTask(jConf)
+              * scheduler.defaultPercentOfPmemInVmem);
+    }
+    return pMemForTask;
+  }
+
+  static class Memory {
+    long vmem;
+    long pmem;
+
+    Memory(long vm, long pm) {
+      this.vmem = vm;
+      this.pmem = pm;
+    }
+  }
+
+  /**
+   * Find the memory that is already used by all the running tasks
+   * residing on the given TaskTracker.
+   * 
+   * @param taskTracker
+   * @return amount of memory that is used by the residing tasks
+   */
+  private synchronized Memory getMemReservedForTasks(
+      TaskTrackerStatus taskTracker) {
+    boolean disabledVmem = false;
+    boolean disabledPmem = false;
+
+    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      disabledVmem = true;
+    }
+
+    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
+      disabledPmem = true;
+    }
+
+    if (disabledVmem && disabledPmem) {
+      return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
+          JobConf.DISABLED_MEMORY_LIMIT);
+    }
+
+    long vmem = 0;
+    long pmem = 0;
+
+    for (TaskStatus task : taskTracker.getTaskReports()) {
+      // the following task states are one in which the slot is
+      // still occupied and hence memory of the task should be
+      // accounted in used memory.
+      if ((task.getRunState() == TaskStatus.State.RUNNING)
+          || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
+        JobConf jConf =
+            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
+                .getJobConf();
+        if (!disabledVmem) {
+          vmem += getVirtualMemoryForTask(jConf);
+        }
+        if (!disabledPmem) {
+          pmem += getPhysicalMemoryForTask(jConf);
+        }
+      }
+    }
+
+    return new Memory(vmem, pmem);
+  }
+
+  /**
+   * Check if a TT has enough pmem and vmem to run this job.
+   * @param job
+   * @param taskTracker
+   * @return true if this TT has enough memory for this job. False otherwise.
+   */
+  boolean matchesMemoryRequirements(JobInProgress job,
+      TaskTrackerStatus taskTracker) {
+
+    // ////////////// vmem based scheduling
+    if (!isSchedulingBasedOnVmemEnabled()) {
+      LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
+          + "and limitMaxVmemPerTasks is not configured. Scheduling based "
+          + "on job's memory requirements is disabled, ignoring any value "
+          + "set by job.");
+      return true;
+    }
+
+    TaskTrackerStatus.ResourceStatus resourceStatus =
+        taskTracker.getResourceStatus();
+    long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
+    long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
+
+    if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
+        || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+      return true;
+    }
+
+    if (reservedVMemOnTT > totalVMemOnTT) {
+      return true;
+    }
+
+    long jobVMemForTask = job.getMaxVirtualMemoryForTask();
+    if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      jobVMemForTask = scheduler.defaultMaxVmPerTask;
+    }
+
+    Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
+    long vmemUsedOnTT = memReservedForTasks.vmem;
+    long pmemUsedOnTT = memReservedForTasks.pmem;
+
+    long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+
+    if (jobVMemForTask > freeVmemUsedOnTT) {
+      return false;
+    }
+
+    // ////////////// pmem based scheduling
+
+    long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
+    long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
+    long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
+    long freePmemUsedOnTT = 0;
+
+    if (isSchedulingBasedOnPmemEnabled()) {
+      if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
+          || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+        return true;
+      }
+
+      if (reservedPmemOnTT > totalPmemOnTT) {
+        return true;
+      }
+
+      if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+        jobPMemForTask =
+            Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
+      }
+
+      freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
+
+      if (jobPMemForTask > freePmemUsedOnTT) {
+        return false;
+      }
+    } else {
+      LOG.debug("One of the configuration parameters "
+          + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
+          + "configured. Scheduling based on job's physical memory "
+          + "requirements is disabled, ignoring any value set by job.");
+    }
+
+    LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
+        + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
+        + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
+        + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+    return true;
+  }
+}

+ 421 - 27
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -31,13 +31,19 @@ import java.util.TreeMap;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.conf.Configuration;
 
 
 public class TestCapacityScheduler extends TestCase {
-  
+
+  static final Log LOG =
+      LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
+
   private static int jobCounter;
   
   /**
@@ -147,8 +153,7 @@ public class TestCapacityScheduler extends TestCase {
       new HashSet<TaskInProgress>();
     
     public FakeJobInProgress(JobID jId, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) 
-    throws IOException {
+        FakeTaskTrackerManager taskTrackerManager, String user) {
       super(jId, jobConf);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
@@ -165,6 +170,8 @@ public class TestCapacityScheduler extends TestCase {
       }
       mapTaskCtr = 0;
       redTaskCtr = 0;
+      super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
+      super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
     }
     
     @Override
@@ -301,20 +308,23 @@ public class TestCapacityScheduler extends TestCase {
       new HashMap<String, TaskTrackerStatus>();
     private Map<String, TaskStatus> taskStatuses = 
       new HashMap<String, TaskStatus>();
+    private Map<JobID, JobInProgress> jobs =
+        new HashMap<JobID, JobInProgress>();
 
     public FakeTaskTrackerManager() {
-      this(2, 1);
+      this(2, 2, 1);
     }
-    
-    public FakeTaskTrackerManager(int maxMapSlots, int maxReduceSlots) {
-      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapSlots, maxReduceSlots));
-      maxMapTasksPerTracker = maxMapSlots;
-      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapSlots, maxReduceSlots));
-      maxReduceTasksPerTracker = maxReduceSlots;
+
+    public FakeTaskTrackerManager(int numTaskTrackers,
+        int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
+      this.maxMapTasksPerTracker = maxMapTasksPerTracker;
+      this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
+      for (int i = 1; i < numTaskTrackers + 1; i++) {
+        String ttName = "tt" + i;
+        trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
+            new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
+            maxReduceTasksPerTracker));
+      }
     }
     
     public void addTaskTracker(String ttName) {
@@ -338,7 +348,23 @@ public class TestCapacityScheduler extends TestCase {
     public int getNextHeartbeatInterval() {
       return MRConstants.HEARTBEAT_INTERVAL_MIN;
     }
-    
+
+    @Override
+    public void killJob(JobID jobid) throws IOException {
+      JobInProgress job = jobs.get(jobid);
+      finalizeJob(job, JobStatus.KILLED);
+      job.kill();
+    }
+
+    @Override
+    public JobInProgress getJob(JobID jobid) {
+      return jobs.get(jobid);
+    }
+
+    Collection<JobInProgress> getJobs() {
+      return jobs.values();
+    }
+
     public Collection<TaskTrackerStatus> taskTrackers() {
       return trackers.values();
     }
@@ -352,7 +378,8 @@ public class TestCapacityScheduler extends TestCase {
       listeners.remove(listener);
     }
     
-    public void submitJob(JobInProgress job) {
+    public void submitJob(JobInProgress job) throws IOException {
+      jobs.put(job.getJobID(), job);
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }
@@ -369,6 +396,11 @@ public class TestCapacityScheduler extends TestCase {
         reduces++;
       }
       TaskStatus status = new TaskStatus() {
+        @Override
+        public TaskAttemptID getTaskID() {
+          return t.getTaskID();
+        }
+
         @Override
         public boolean getIsMap() {
           return t.isMapTask();
@@ -393,9 +425,13 @@ public class TestCapacityScheduler extends TestCase {
     }
     
     void finalizeJob(FakeJobInProgress fjob) {
+      finalizeJob(fjob, JobStatus.SUCCEEDED);
+    }
+
+    void finalizeJob(JobInProgress fjob, int state) {
       // take a snapshot of the status before changing it
       JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      fjob.getStatus().setRunState(JobStatus.SUCCEEDED);
+      fjob.getStatus().setRunState(state);
       JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
       JobStatusChangeEvent event = 
         new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
@@ -539,9 +575,16 @@ public class TestCapacityScheduler extends TestCase {
   private FakeClock clock;
 
   @Override
-  protected void setUp() throws Exception {
+  protected void setUp() {
+    setUp(2, 2, 1);
+  }
+
+  private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
+      int numReduceTasksPerTracker) {
     jobCounter = 0;
-    taskTrackerManager = new FakeTaskTrackerManager();
+    taskTrackerManager =
+        new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
+            numReduceTasksPerTracker);
     clock = new FakeClock();
     scheduler = new CapacityTaskScheduler(clock);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -559,7 +602,24 @@ public class TestCapacityScheduler extends TestCase {
       scheduler.terminate();
     }
   }
-  
+
+  private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
+    FakeJobInProgress job =
+        new FakeJobInProgress(new JobID("test", ++jobCounter),
+            (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+            jobConf.getUser());
+    job.getStatus().setRunState(state);
+    taskTrackerManager.submitJob(job);
+    return job;
+  }
+
+  private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
+      throws IOException {
+    FakeJobInProgress j = submitJob(state, jobConf);
+    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    return j;
+  }
+
   private FakeJobInProgress submitJob(int state, int maps, int reduces, 
       String queue, String user) throws IOException {
     JobConf jobConf = new JobConf(conf);
@@ -567,11 +627,8 @@ public class TestCapacityScheduler extends TestCase {
     jobConf.setNumReduceTasks(reduces);
     if (queue != null)
       jobConf.setQueueName(queue);
-    FakeJobInProgress job = new FakeJobInProgress(
-        new JobID("test", ++jobCounter), jobConf, taskTrackerManager, user);
-    job.getStatus().setRunState(state);
-    taskTrackerManager.submitJob(job);
-    return job;
+    jobConf.setUser(user);
+    return submitJob(state, jobConf);
   }
   
   // Submit a job and update the listeners
@@ -1285,6 +1342,344 @@ public class TestCapacityScheduler extends TestCase {
     assertEquals(schedulingInfo, schedulingInfo2);   
   }
 
+  /**
+   * Test to verify that highMemoryJobs are scheduled like all other jobs when
+   * memory-based scheduling is not enabled.
+   * @throws IOException
+   */
+  public void testDisabledMemoryBasedScheduling()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+
+    // Limited TT - 1GB vmem and 512MB pmem
+    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
+        .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
+    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
+        .setTotalPhysicalMemory(512 * 1024 * 1024L);
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // memory-based scheduling disabled by default.
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
+        + "and 1 reduce task.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
+    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.RUNNING, jConf);
+
+    // assert that all tasks are launched even though they transgress the
+    // scheduling limits.
+
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+  }
+
+  /**
+   * Test to verify that highPmemJobs are scheduled like all other jobs when
+   * physical-memory based scheduling is not enabled.
+   * @throws IOException
+   */
+  public void testDisabledPmemBasedScheduling()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+
+    // Limited TT - 100GB vmem and 500MB pmem
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enable vmem-based scheduling. pmem based scheduling disabled by default.
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        3 * 1024 * 1024 * 1024L);
+    scheduler.start();
+
+    LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
+        + "and 1 reduce task.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
+    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.RUNNING, jConf);
+
+    // assert that all tasks are launched even though they transgress the
+    // scheduling limits.
+
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+  }
+
+  /**
+   * Test HighMemoryJobs.
+   * @throws IOException
+   */
+  public void testHighMemoryJobs()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+    // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        3 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
+        + "1 map task and 1 reduce task.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
+    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+    // No more tasks of this job can run on the TT because of lack of vmem
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+
+    LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
+        + "1 map task and 0 reduces.");
+    jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.PREP, jConf); // job2
+
+    // This job shouldn't run the TT now because of lack of pmem
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+    LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
+        + "0 maps and 1 reduce task.");
+    jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
+    jConf.setNumMapTasks(0);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    submitJobAndInit(JobStatus.PREP, jConf); // job3
+
+    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+  }
+
+  /**
+   * test invalid highMemoryJobs
+   * @throws IOException
+   */
+  public void testHighMemoryJobWithInvalidRequirements()
+      throws IOException {
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
+    long vmemDefault = 1536 * 1024 * 1024L;
+    long pmemUpperLimit = vmemUpperLimit;
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        vmemDefault);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        vmemUpperLimit);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
+    scheduler.setResourceManagerConf(resConf);
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+        scheduler.jobQueuesManager,
+        resConf,
+        resConf.getQueues());
+    scheduler.setInitializationPoller(p);
+    scheduler.start();
+
+    LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
+        + "1 map, 0 reduce tasks.");
+    long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
+    long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
+    jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+
+    boolean throwsException = false;
+    String msg = null;
+    FakeJobInProgress job;
+    try {
+      job = submitJob(JobStatus.PREP, jConf);
+    } catch (IOException ioe) {
+      // job has to fail
+      throwsException = true;
+      msg = ioe.getMessage();
+    }
+
+    assertTrue(throwsException);
+    job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
+    assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
+        + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
+        + vmemUpperLimit + "vmem, " + pmemUpperLimit
+        + "pmem\\). Cannot run in this cluster, so killing it."));
+    // For job, no cleanup task needed so gets killed immediately.
+    assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+  }
+
+  /**
+   * Test blocking of cluster for lack of memory.
+   * @throws IOException
+   */
+  public void testClusterBlockingForLackOfMemory()
+      throws IOException {
+
+    LOG.debug("Starting the scheduler.");
+    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+    taskTrackerManager.addQueues(new String[] { "default" });
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        4 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
+        + "1 map, 0 reduce tasks.");
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    // TTs should not run these jobs i.e. cluster blocked because of lack of
+    // vmem
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    // Job should still be alive
+    assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
+
+    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
+    // Use cluster-wide defaults
+    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // cluster should still be blocked for job1 and so even job2 should not run
+    // even though it is a normal job
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+
+    scheduler.taskTrackerManager.killJob(job2.getJobID());
+    scheduler.taskTrackerManager.killJob(job1.getJobID());
+
+    LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+        + "1 map, 0 reduce tasks.");
+    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
+    jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+    FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
+    // TTs should not run these jobs i.e. cluster blocked because of lack of
+    // pmem now.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    
+    // Job should still be alive
+    assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
+
+    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
+    // Use cluster-wide defaults
+    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    submitJobAndInit(JobStatus.PREP, jConf); // job4
+
+    // cluster should still be blocked for job3 and so even job4 should not run
+    // even though it is a normal job
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
   protected TaskTrackerStatus tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }
@@ -1311,7 +1706,7 @@ public class TestCapacityScheduler extends TestCase {
   public void testJobInitialization() throws Exception {
     // set up the scheduler
     String[] qs = { "default" };
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1);
+    taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
@@ -1580,5 +1975,4 @@ public class TestCapacityScheduler extends TestCase {
     return userJobs;
 
   }
-  
 }

+ 12 - 2
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -161,10 +161,20 @@ public class TestFairScheduler extends TestCase {
     public int getNextHeartbeatInterval() {
       return MRConstants.HEARTBEAT_INTERVAL_MIN;
     }
-    
+
+    @Override
+    public void killJob(JobID jobid) {
+      return;
+    }
+
+    @Override
+    public JobInProgress getJob(JobID jobid) {
+      return null;
+    }
+
     // Test methods
     
-    public void submitJob(JobInProgress job) {
+    public void submitJob(JobInProgress job) throws IOException {
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }

+ 132 - 0
src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Plugin to calculate virtual and physical memories on Linux systems.
+ */
+public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
+  private static final Log LOG =
+      LogFactory.getLog(LinuxMemoryCalculatorPlugin.class);
+
+  /**
+   * proc's meminfo virtual file has keys-values in the format
+   * "key:[ \t]*value[ \t]kB".
+   */
+  private static final String PROCFS_MEMFILE = "/proc/meminfo";
+  private static final Pattern PROCFS_MEMFILE_FORMAT =
+      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
+
+  // We just need the values for the keys MemTotal and SwapTotal
+  private static final String MEMTOTAL_STRING = "MemTotal";
+  private static final String SWAPTOTAL_STRING = "SwapTotal";
+
+  private long ramSize = 0;
+  private long swapSize = 0;
+
+  boolean readMemInfoFile = false;
+
+  private void readProcMemInfoFile() {
+
+    if (readMemInfoFile) {
+      return;
+    }
+
+    // Read "/proc/memInfo" file
+    BufferedReader in = null;
+    FileReader fReader = null;
+    try {
+      fReader = new FileReader(PROCFS_MEMFILE);
+      in = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      // shouldn't happen....
+      return;
+    }
+
+    Matcher mat = null;
+
+    try {
+      String str = in.readLine();
+      while (str != null) {
+        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
+        if (mat.find()) {
+          if (mat.group(1).equals(MEMTOTAL_STRING)) {
+            ramSize = Long.parseLong(mat.group(2));
+          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
+            swapSize = Long.parseLong(mat.group(2));
+          }
+        }
+        str = in.readLine();
+      }
+    } catch (IOException io) {
+      LOG.warn("Error reading the stream " + io);
+    } finally {
+      // Close the streams
+      try {
+        fReader.close();
+        try {
+          in.close();
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + in);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+
+    readMemInfoFile = true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPhysicalMemorySize() {
+    readProcMemInfoFile();
+    return ramSize * 1024;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getVirtualMemorySize() {
+    readProcMemInfoFile();
+    return (ramSize + swapSize) * 1024;
+  }
+
+  /**
+   * Test the {@link LinuxMemoryCalculatorPlugin}
+   * 
+   * @param args
+   */
+  public static void main(String[] args) {
+    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
+    System.out.println("Physical memory Size(bytes) : "
+        + plugin.getPhysicalMemorySize());
+    System.out.println("Total Virtual memory Size(bytes) : "
+        + plugin.getVirtualMemorySize());
+  }
+}

+ 74 - 0
src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * Plugin to calculate virtual and physical memories on the system.
+ * 
+ */
+public abstract class MemoryCalculatorPlugin extends Configured {
+
+  /**
+   * Obtain the total size of the virtual memory present in the system.
+   * 
+   * @return virtual memory size in bytes.
+   */
+  public abstract long getVirtualMemorySize();
+
+  /**
+   * Obtain the total size of the physical memory present in the system.
+   * 
+   * @return physical memory size bytes.
+   */
+  public abstract long getPhysicalMemorySize();
+
+  /**
+   * Get the MemoryCalculatorPlugin from the class name and configure it. If
+   * class name is null, this method will try and return a memory calculator
+   * plugin available for this system.
+   * 
+   * @param clazz class-name
+   * @param conf configure the plugin with this.
+   * @return MemoryCalculatorPlugin
+   */
+  public static MemoryCalculatorPlugin getMemoryCalculatorPlugin(
+      Class<? extends MemoryCalculatorPlugin> clazz, Configuration conf) {
+
+    if (clazz != null) {
+      return ReflectionUtils.newInstance(clazz, conf);
+    }
+
+    // No class given, try a os specific class
+    try {
+      String osName = System.getProperty("os.name");
+      if (osName.startsWith("Linux")) {
+        return new LinuxMemoryCalculatorPlugin();
+      }
+    } catch (SecurityException se) {
+      // Failed to get Operating System name.
+      return null;
+    }
+
+    // Not supported on this system.
+    return null;
+  }
+}

+ 2 - 2
src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java

@@ -188,7 +188,7 @@ public class ProcfsBasedProcessTree {
    * Get the cumulative virtual memory used by all the processes in the
    * process-tree.
    * 
-   * @return cumulative virtual memory used by the process-tree in kilobytes.
+   * @return cumulative virtual memory used by the process-tree in bytes.
    */
   public long getCumulativeVmem() {
     long total = 0;
@@ -197,7 +197,7 @@ public class ProcfsBasedProcessTree {
         total += p.getVmem();
       }
     }
-    return total/1024;
+    return total;
   }
 
   /**

+ 4 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -52,8 +52,11 @@ interface InterTrackerProtocol extends VersionedProtocol {
                  so that the TaskTracker can synchronize itself.
    * Version 20: Changed status message due to changes in TaskStatus
    *             (HADOOP-4232)
+   * Version 21: Changed information reported in TaskTrackerStatus'
+   *             ResourceStatus and the corresponding accessor methods
+   *             (HADOOP-4035)
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 21L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 146 - 22
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -108,14 +108,120 @@ public class JobConf extends Configuration {
    * A value which if set for memory related configuration options,
    * indicates that the options are turned off.
    */
-  static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
+  public static final long DISABLED_MEMORY_LIMIT = -1L;
   
   /**
    * Name of the queue to which jobs will be submitted, if no queue
    * name is mentioned.
    */
   public static final String DEFAULT_QUEUE_NAME = "default";
-  
+
+  /**
+   * Cluster-wide configuration to be set by the administrators that provides
+   * default amount of maximum virtual memory for job's tasks. This has to be
+   * set on both the JobTracker node for the sake of scheduling decisions and on
+   * the TaskTracker nodes for the sake of memory management.
+   * 
+   * <p>
+   * 
+   * If a job doesn't specify its virtual memory requirement by setting
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
+   * tasks are assured a memory limit set to this property. This property is
+   * disabled by default, and if not explicitly set to a valid value by the
+   * administrators and if a job doesn't specify its virtual memory
+   * requirements, the job's tasks will not be assured anything and may be
+   * killed by a TT that intends to control the total memory usage of the tasks
+   * via memory management functionality.
+   * 
+   * <p>
+   * 
+   * This value should in general be less than the cluster-wide configuration
+   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
+   * TaskTracker's memory management may be disabled and a scheduler's memory
+   * based scheduling decisions will be affected. Please refer to the
+   * documentation of the configured scheduler to see how this property is used.
+   */
+  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
+      "mapred.task.default.maxvmem";
+
+  /**
+   * The maximum amount of memory any task of this job will use.
+   * 
+   * <p>
+   * 
+   * This value will be used by TaskTrackers for monitoring the memory usage of
+   * tasks of this jobs. If a TaskTracker's memory management functionality is
+   * enabled, each task of this job will be allowed to use a maximum virtual
+   * memory specified by this property. If the task's memory usage goes over
+   * this value, the task will be failed by the TT. If not set, the cluster-wide
+   * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
+   * default value for memory requirements. If this property cascaded with
+   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
+   * tasks will not be assured anything and may be killed by a TT that intends
+   * to control the total memory usage of the tasks via memory management
+   * functionality. If the memory management functionality is disabled on a TT,
+   * this value is ignored.
+   * 
+   * <p>
+   * 
+   * This value should also be not more than the cluster-wide configuration
+   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
+   * administrators.
+   * 
+   * <p>
+   * 
+   * This value may be used by schedulers that support scheduling based on job's
+   * memory requirements. In general, a task of this job will be scheduled on a
+   * TaskTracker only if the amount of virtual memory still unoccupied on the
+   * TaskTracker is greater than or equal to this value. But different
+   * schedulers can take different decisions. Please refer to the documentation
+   * of the scheduler being configured to see if it does memory based scheduling
+   * and if it does, how this property is used by that scheduler.
+   * 
+   * @see #setMaxVirtualMemoryForTask(long)
+   * @see #getMaxVirtualMemoryForTask()
+   */
+  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
+      "mapred.task.maxvmem";
+
+  /**
+   * The maximum amount of physical memory any task of a job will use.
+   * 
+   * <p>
+   * 
+   * This value may be used by schedulers that support scheduling based on job's
+   * memory requirements. In general, a task of this job will be scheduled on a
+   * TaskTracker, only if the amount of physical memory still unoccupied on the
+   * TaskTracker is greater than or equal to this value. But different
+   * schedulers can take different decisions. Please refer to the documentation
+   * of the scheduler being configured to see how it does memory based
+   * scheduling and how this variable is used by that scheduler.
+   * 
+   * @see #setMaxPhysicalMemoryForTask(long)
+   * @see #getMaxPhysicalMemoryForTask()
+   */
+  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
+      "mapred.task.maxpmem";
+
+  /**
+   * Cluster-wide configuration to be set by the site administrators that
+   * provides an upper limit on the maximum virtual memory that can be specified
+   * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
+   * the cluster-wide configuration
+   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
+   * less than this value. If the job configuration
+   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
+   * depending on the scheduler being configured, the job may be rejected or the
+   * job configuration may just be ignored.
+   * 
+   * <p>
+   * 
+   * If it is not set on a TaskTracker, TaskTracker's memory management will be
+   * disabled.
+   */
+  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
+      "mapred.task.limit.maxvmem";
+
   /**
    * Construct a map/reduce job configuration.
    */
@@ -1346,35 +1452,53 @@ public class JobConf extends Configuration {
   }
   
   /**
-   * The maximum amount of memory any task of this job will use.
+   * The maximum amount of memory any task of this job will use. See
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * 
-   * A task of this job will be scheduled on a tasktracker, only if the
-   * amount of free memory on the tasktracker is greater than 
-   * or equal to this value.
+   * @return The maximum amount of memory any task of this job will use, in
+   *         bytes.
+   * @see #setMaxVirtualMemoryForTask(long)
+   */
+  public long getMaxVirtualMemoryForTask() {
+    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Set the maximum amount of memory any task of this job can use. See
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * 
-   * If set to {@link #DISABLED_VIRTUAL_MEMORY_LIMIT}, tasks are assured 
-   * a memory limit set to mapred.task.default.maxmemory. If the value of
-   * mapred.tasktracker.tasks.maxmemory is set to -1, this value is 
-   * ignored.
+   * @param vmem Maximum amount of virtual memory in bytes any task of this job
+   *          can use.
+   * @see #getMaxVirtualMemoryForTask()
+   */
+  public void setMaxVirtualMemoryForTask(long vmem) {
+    setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
+  }
+
+  /**
+   * The maximum amount of physical memory any task of this job will use. See
+   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
    * 
-   * @return The maximum amount of memory any task of this job will use, in kilobytes.
-   * @see #getMaxVirtualMemoryForTasks()
+   * @return The maximum amount of physical memory any task of this job will
+   *         use, in bytes.
+   * @see #setMaxPhysicalMemoryForTask(long)
    */
-  long getMaxVirtualMemoryForTask() {
-    return getLong("mapred.task.maxmemory", DISABLED_VIRTUAL_MEMORY_LIMIT);
+  public long getMaxPhysicalMemoryForTask() {
+    return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
   }
-  
+
   /**
-   * Set the maximum amount of memory any task of this job can use.
+   * Set the maximum amount of physical memory any task of this job can use. See
+   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
    * 
-   * @param vmem Maximum amount of memory in kilobytes any task of this job 
-   * can use.
-   * @see #getMaxVirtualMemoryForTask()
+   * @param pmem Maximum amount of physical memory in bytes any task of this job
+   *          can use.
+   * @see #getMaxPhysicalMemoryForTask()
    */
-  void setMaxVirtualMemoryForTask(long vmem) {
-    setLong("mapred.task.maxmemory", vmem);
+  public void setMaxPhysicalMemoryForTask(long pmem) {
+    setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
   }
-  
+
   /**
    * Return the name of the queue to which this job is submitted.
    * Defaults to 'default'.

+ 19 - 3
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -150,6 +150,7 @@ class JobInProgress {
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
   private long maxVirtualMemoryForTask;
+  private long maxPhysicalMemoryForTask;
   
   // Per-job counters
   public static enum Counter { 
@@ -245,7 +246,8 @@ class JobInProgress {
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
-    this.maxVirtualMemoryForTask = conf.getMaxVirtualMemoryForTask();
+    setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
+    setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
   }
 
   /**
@@ -349,6 +351,8 @@ class JobInProgress {
       jobInitKillStatus.initStarted = true;
     }
 
+    LOG.debug("initializing " + this.jobId);
+
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
                                     this.startTime);
@@ -533,10 +537,22 @@ class JobInProgress {
   }
 
   // Accessors for resources.
-  public long getMaxVirtualMemoryForTask() {
+  long getMaxVirtualMemoryForTask() {
     return maxVirtualMemoryForTask;
   }
-  
+
+  void setMaxVirtualMemoryForTask(long maxVMem) {
+    maxVirtualMemoryForTask = maxVMem;
+  }
+
+  long getMaxPhysicalMemoryForTask() {
+    return maxPhysicalMemoryForTask;
+  }
+
+  void setMaxPhysicalMemoryForTask(long maxPMem) {
+    maxPhysicalMemoryForTask = maxPMem;
+  }
+
   // Update the job start/launch time (upon restart) and log to history
   synchronized void updateJobInfo(long startTime, long launchTime, int count) {
     // log and change to the job's start/launch time

+ 4 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgressListener.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
 /**
  * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
  * {@link JobTracker}.
@@ -26,8 +28,9 @@ abstract class JobInProgressListener {
   /**
    * Invoked when a new job has been added to the {@link JobTracker}.
    * @param job The added job.
+   * @throws IOException 
    */
-  public abstract void jobAdded(JobInProgress job);
+  public abstract void jobAdded(JobInProgress job) throws IOException;
 
   /**
    * Invoked when a job has been removed from the {@link JobTracker}.

+ 6 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -2247,7 +2247,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       synchronized (taskScheduler) {
         jobs.put(job.getProfile().getJobID(), job);
         for (JobInProgressListener listener : jobInProgressListeners) {
-          listener.jobAdded(job);
+          try {
+            listener.jobAdded(job);
+          } catch (IOException ioe) {
+            LOG.warn("Failed to add and so skipping the job : "
+                + job.getJobID() + ". Exception : " + ioe);
+          }
         }
       }
     }

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -291,7 +291,7 @@ class JvmManager {
       if (tracker.isTaskMemoryManagerEnabled()) {
         tracker.getTaskMemoryManager().addTask(
             TaskAttemptID.forName(env.conf.get("mapred.task.id")),
-            tracker.getMemoryForTask(env.conf));
+            tracker.getVirtualMemoryForTask(env.conf));
       }
       setRunningTaskForJvm(jvmRunner.jvmId, t);
       LOG.info(jvmRunner.getName());

+ 17 - 8
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java

@@ -60,7 +60,9 @@ class TaskMemoryManagerThread extends Thread {
     tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
-    maxMemoryAllowedForAllTasks = taskTracker.getMaxVirtualMemoryForTasks();
+    maxMemoryAllowedForAllTasks =
+        taskTracker.getTotalVirtualMemoryOnTT()
+            - taskTracker.getReservedVirtualMemory();
 
     monitoringInterval = taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
@@ -72,9 +74,6 @@ class TaskMemoryManagerThread extends Thread {
   public void addTask(TaskAttemptID tid, long memLimit) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      // TODO: Negative values must have been checked in JobConf.
-      memLimit = (memLimit < 0 ? JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
-          : memLimit);
       ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
           sleepTimeBeforeSigKill);
       tasksToBeAdded.put(tid, ptInfo);
@@ -203,15 +202,25 @@ class TaskMemoryManagerThread extends Thread {
         long currentMemUsage = pTree.getCumulativeVmem();
         long limit = ptInfo.getMemLimit();
         LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage
-            + "kB. Limit : " + limit + "kB");
+            + "bytes. Limit : " + limit + "bytes");
+
+        if (limit > taskTracker.getLimitMaxVMemPerTask()) {
+          // TODO: With monitoring enabled and no scheduling based on
+          // memory,users can seriously hijack the system by specifying memory
+          // requirements well above the cluster wide limit. Ideally these jobs
+          // should have been rejected by JT/scheduler. Because we can't do
+          // that, in the minimum we should fail the tasks and hence the job.
+          LOG.warn("Task " + tid
+              + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
+        }
 
-        if (limit != JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
+        if (limit != JobConf.DISABLED_MEMORY_LIMIT
             && currentMemUsage > limit) {
           // Task (the root process) is still alive and overflowing memory.
           // Clean up.
           String msg = "TaskTree [pid=" + pId + ",tipID=" + tid
               + "] is running beyond memory-limits. Current usage : "
-              + currentMemUsage + "kB. Limit : " + limit + "kB. Killing task.";
+              + currentMemUsage + "bytes. Limit : " + limit + "bytes. Killing task.";
           LOG.warn(msg);
           taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 
@@ -227,7 +236,7 @@ class TaskMemoryManagerThread extends Thread {
       }
 
       LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
-          + "kB. Total limit : " + maxMemoryAllowedForAllTasks);
+          + "bytes. Total limit : " + maxMemoryAllowedForAllTasks);
 
       if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
         LOG.warn("The total memory usage is still overflowing TTs limits."

+ 241 - 76
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
@@ -185,10 +186,64 @@ public class TaskTracker
   volatile JvmManager jvmManager;
   
   private TaskMemoryManagerThread taskMemoryManager;
-  private boolean taskMemoryManagerEnabled = false;
-  private long maxVirtualMemoryForTasks 
-                                    = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
-  
+  private boolean taskMemoryManagerEnabled = true;
+  private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
+
+  // Cluster wide default value for max-vm per task
+  private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
+  // Cluster wide upper limit on max-vm per task
+  private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
+
+  /**
+   * Configuration property to specify the amount of virtual memory that has to
+   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
+   * virtual memory should be a part of the total virtual memory available on
+   * the TaskTracker. TaskTracker obtains the total virtual memory available on
+   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
+   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
+   * MemoryCalculatorPlugin implementation.
+   * 
+   * <p>
+   * 
+   * The reserved virtual memory and the total virtual memory values are
+   * reported by the TaskTracker as part of heart-beat so that they can
+   * considered by a scheduler.
+   * 
+   * <p>
+   * 
+   * These two values are also used by the TaskTracker for tracking tasks'
+   * memory usage. Memory management functionality on a TaskTracker is disabled
+   * if this property is not set, if it more than the total virtual memory
+   * reported by MemoryCalculatorPlugin, or if either of the values is negative.
+   */
+  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
+      "mapred.tasktracker.vmem.reserved";
+
+  /**
+   * Configuration property to specify the amount of physical memory that has to
+   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
+   * physical memory should be a part of the total physical memory available on
+   * the TaskTracker. TaskTracker obtains the total physical memory available on
+   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
+   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
+   * MemoryCalculatorPlugin implementation.
+   * 
+   * <p>
+   * 
+   * The reserved virtual memory and the total virtual memory values are
+   * reported by the TaskTracker as part of heart-beat so that they can
+   * considered by a scheduler.
+   * 
+   */
+  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
+      "mapred.tasktracker.pmem.reserved";
+
+  static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
+      "mapred.tasktracker.memory_calculator_plugin";
+
   /**
    * the minimum interval between jobtracker polls
    */
@@ -460,17 +515,11 @@ public class TaskTracker
                              "Map-events fetcher for all reduce tasks " + "on " + 
                              taskTrackerName);
     mapEventsFetcher.start();
-    maxVirtualMemoryForTasks = fConf.
-                                  getLong("mapred.tasktracker.tasks.maxmemory",
-                                          JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT);
+
+    initializeMemoryManagement();
+
     this.indexCache = new IndexCache(this.fConf);
-    // start the taskMemoryManager thread only if enabled
-    setTaskMemoryManagerEnabledFlag();
-    if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager = new TaskMemoryManagerThread(this);
-      taskMemoryManager.setDaemon(true);
-      taskMemoryManager.start();
-    }
+
     mapLauncher = new TaskLauncher(maxCurrentMapTasks);
     reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
     mapLauncher.start();
@@ -1139,12 +1188,17 @@ public class TaskTracker
     if (askForNewTask) {
       checkLocalDirs(fConf.getLocalDirs());
       askForNewTask = enoughFreeSpace(localMinSpaceStart);
-      status.getResourceStatus().setAvailableSpace( getFreeSpace() );
-      long freeVirtualMem = findFreeVirtualMemory();
-      LOG.debug("Setting amount of free virtual memory for the new task: " +
-                    freeVirtualMem);
-      status.getResourceStatus().setFreeVirtualMemory(freeVirtualMem);
-      status.getResourceStatus().setTotalMemory(maxVirtualMemoryForTasks);
+      long freeDiskSpace = getFreeSpace();
+      long totVmem = getTotalVirtualMemoryOnTT();
+      long totPmem = getTotalPhysicalMemoryOnTT();
+      long rsrvdVmem = getReservedVirtualMemory();
+      long rsrvdPmem = getReservedPhysicalMemory();
+
+      status.getResourceStatus().setAvailableSpace(freeDiskSpace);
+      status.getResourceStatus().setTotalVirtualMemory(totVmem);
+      status.getResourceStatus().setTotalPhysicalMemory(totPmem);
+      status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
+      status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
     }
       
     //
@@ -1192,67 +1246,67 @@ public class TaskTracker
   }
 
   /**
-   * Return the maximum amount of memory available for all tasks on 
-   * this tracker
-   * @return maximum amount of virtual memory
+   * Return the total virtual memory available on this TaskTracker.
+   * @return total size of virtual memory.
    */
-  long getMaxVirtualMemoryForTasks() {
-    return maxVirtualMemoryForTasks;
+  long getTotalVirtualMemoryOnTT() {
+    return totalVirtualMemoryOnTT;
   }
-  
+
   /**
-   * Find the minimum amount of virtual memory that would be
-   * available for a new task.
-   * 
-   * The minimum amount of virtual memory is computed by looking
-   * at the maximum amount of virtual memory that is allowed for
-   * all tasks in the system, as per mapred.tasktracker.tasks.maxmemory,
-   * and the total amount of maximum virtual memory that can be
-   * used by all currently running tasks.
-   * 
-   * @return amount of free virtual memory that can be assured for
-   * new tasks
+   * Return the total physical memory available on this TaskTracker.
+   * @return total size of physical memory.
    */
-  private synchronized long findFreeVirtualMemory() {
-  
-    if (maxVirtualMemoryForTasks == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
-      // this will disable picking up tasks based on free memory.
-      return JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
-    }
-  
-    long maxMemoryUsed = 0L;
-    for (TaskInProgress tip: runningTasks.values()) {
-      // the following task states are one in which the slot is
-      // still occupied and hence memory of the task should be
-      // accounted in used memory.
-      if ((tip.getRunState() == TaskStatus.State.RUNNING)
-            || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        maxMemoryUsed += getMemoryForTask(tip.getJobConf());
-      }
-    }
-  
-    return (maxVirtualMemoryForTasks - maxMemoryUsed);
+  long getTotalPhysicalMemoryOnTT() {
+    return totalPmemOnTT;
+  }
+
+  /**
+   * Return the amount of virtual memory reserved on the TaskTracker for system
+   * usage (OS, TT etc).
+   */
+  long getReservedVirtualMemory() {
+    return reservedVirtualMemory;
   }
 
   /**
-   * Return the memory allocated for a TIP.
+   * Return the amount of physical memory reserved on the TaskTracker for system
+   * usage (OS, TT etc).
+   */
+  long getReservedPhysicalMemory() {
+    return reservedPmem;
+  }
+
+  /**
+   * Return the limit on the maxVMemPerTask on this TaskTracker
+   * @return limitMaxVmPerTask
+   */
+  long getLimitMaxVMemPerTask() {
+    return limitMaxVmPerTask;
+  }
+
+  /**
+   * Obtain the virtual memory allocated for a TIP.
+   * 
+   * If the TIP's job has a configured value for the max-virtual memory, that
+   * will be returned. Else, the cluster-wide default maxvirtual memory for
+   * tasks is returned.
    * 
-   * If the TIP's job has a configured value for the max memory that is
-   * returned. Else, the default memory that would be assigned for the
-   * task is returned.
    * @param conf
-   * @return the memory allocated for the TIP.
+   * @return the virtual memory allocated for the TIP.
    */
-  long getMemoryForTask(JobConf conf) {
-    long memForTask = conf.getMaxVirtualMemoryForTask();
-    if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
-      memForTask = fConf.getLong("mapred.task.default.maxmemory",
-                          512*1024*1024L);
-    }
-    return memForTask;
-  }  
-  
-  
+  long getVirtualMemoryForTask(JobConf conf) {
+    long vMemForTask =
+        normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
+    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      vMemForTask =
+          normalizeMemoryConfigValue(fConf.getLong(
+              JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+              JobConf.DISABLED_MEMORY_LIMIT));
+    }
+    return vMemForTask;
+  }
+
   /**
    * Check if the jobtracker directed a 'reset' of the tasktracker.
    * 
@@ -1634,7 +1688,7 @@ public class TaskTracker
       localizeJob(tip);
       if (isTaskMemoryManagerEnabled()) {
         taskMemoryManager.addTask(tip.getTask().getTaskID(), 
-            getMemoryForTask(tip.getJobConf()));
+            getVirtualMemoryForTask(tip.getJobConf()));
       }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
@@ -2931,6 +2985,75 @@ public class TaskTracker
     return taskMemoryManager;
   }
 
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized val
+   */
+  private long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
+  /**
+   * Memory-related setup
+   */
+  private void initializeMemoryManagement() {
+    Class<? extends MemoryCalculatorPlugin> clazz =
+        fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+            null, MemoryCalculatorPlugin.class);
+    MemoryCalculatorPlugin memoryCalculatorPlugin =
+        (MemoryCalculatorPlugin) MemoryCalculatorPlugin
+            .getMemoryCalculatorPlugin(clazz, fConf);
+    LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
+
+    if (memoryCalculatorPlugin != null) {
+      totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
+      if (totalVirtualMemoryOnTT <= 0) {
+        LOG.warn("TaskTracker's totalVmem could not be calculated. "
+            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+        totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      }
+      totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      if (totalPmemOnTT <= 0) {
+        LOG.warn("TaskTracker's totalPmem could not be calculated. "
+            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+        totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      }
+    }
+
+    reservedVirtualMemory =
+        normalizeMemoryConfigValue(fConf.getLong(
+            TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    reservedPmem =
+        normalizeMemoryConfigValue(fConf.getLong(
+            TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    defaultMaxVmPerTask =
+        normalizeMemoryConfigValue(fConf.getLong(
+            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    limitMaxVmPerTask =
+        normalizeMemoryConfigValue(fConf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+
+    // start the taskMemoryManager thread only if enabled
+    setTaskMemoryManagerEnabledFlag();
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager = new TaskMemoryManagerThread(this);
+      taskMemoryManager.setDaemon(true);
+      taskMemoryManager.start();
+    }
+  }
+
   private void setTaskMemoryManagerEnabledFlag() {
     if (!ProcfsBasedProcessTree.isAvailable()) {
       LOG.info("ProcessTree implementation is missing on this system. "
@@ -2939,13 +3062,55 @@ public class TaskTracker
       return;
     }
 
-    Long tasksMaxMem = getMaxVirtualMemoryForTasks();
-    if (tasksMaxMem == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) {
-      LOG.info("TaskTracker's tasksMaxMem is not set. TaskMemoryManager is "
-          + "disabled.");
+    // /// Missing configuration
+    StringBuilder mesg = new StringBuilder();
+
+    long totalVmemOnTT = getTotalVirtualMemoryOnTT();
+    if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's totalVmem could not be calculated.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    long reservedVmem = getReservedVirtualMemory();
+    if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's reservedVmem is not configured.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n");
       taskMemoryManagerEnabled = false;
+    }
+
+    if (!taskMemoryManagerEnabled) {
+      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
+      return;
+    }
+    // ///// End of missing configuration
+
+    // ///// Mis-configuration
+    if (defaultMaxVmPerTask > limitMaxVmPerTask) {
+      mesg.append("defaultMaxVmPerTask is mis-configured. "
+          + "It shouldn't be greater than limitMaxVmPerTask. ");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (reservedVmem > totalVmemOnTT) {
+      mesg.append("reservedVmemOnTT is mis-configured. "
+          + "It shouldn't be greater than totalVmemOnTT");
+      taskMemoryManagerEnabled = false;
+    }
+
+    if (!taskMemoryManagerEnabled) {
+      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
       return;
     }
+    // ///// End of mis-configuration
 
     taskMemoryManagerEnabled = true;
   }

+ 18 - 1
src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -70,5 +71,21 @@ interface TaskTrackerManager {
    * @return the heartbeat interval used by {@link TaskTracker}s
    */
   public int getNextHeartbeatInterval();
-  
+
+  /**
+   * Kill the job identified by jobid
+   * 
+   * @param jobid
+   * @throws IOException
+   */
+  public void killJob(JobID jobid)
+      throws IOException;
+
+  /**
+   * Obtain the job object identified by jobid
+   * 
+   * @param jobid
+   * @return jobInProgress object
+   */
+  public JobInProgress getJob(JobID jobid);
 }

+ 79 - 35
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -54,59 +54,99 @@ class TaskTrackerStatus implements Writable {
    */
   static class ResourceStatus implements Writable {
     
-    private long freeVirtualMemory;
-    private long totalMemory;
+    private long totalVirtualMemory;
+    private long reservedVirtualMemory;
+    private long totalPhysicalMemory;
+    private long reservedPhysicalMemory;
     private long availableSpace;
     
     ResourceStatus() {
-      freeVirtualMemory = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
-      totalMemory = JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT;
+      totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
       availableSpace = Long.MAX_VALUE;
     }
-    
+
     /**
-     * Set the amount of free virtual memory that is available for running
-     * a new task
-     * @param freeVMem amount of free virtual memory in kilobytes
+     * Set the maximum amount of virtual memory on the tasktracker.
+     * 
+     * @param vmem maximum amount of virtual memory on the tasktracker in bytes.
      */
-    void setFreeVirtualMemory(long freeVmem) {
-      freeVirtualMemory = freeVmem;
+    void setTotalVirtualMemory(long totalMem) {
+      totalVirtualMemory = totalMem;
     }
 
     /**
-     * Get the amount of free virtual memory that will be available for
-     * running a new task. 
+     * Get the maximum amount of virtual memory on the tasktracker.
      * 
-     * If this is {@link JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should 
-     * be ignored and not used in computation.
+     * If this is {@link JobConf#DISABLED_MEMORY_LIMIT}, it should be ignored
+     * and not used in any computation.
      * 
-     *@return amount of free virtual memory in kilobytes.
+     * @return the maximum amount of virtual memory on the tasktracker in bytes.
      */
-    long getFreeVirtualMemory() {
-      return freeVirtualMemory;
+    long getTotalVirtualMemory() {
+      return totalVirtualMemory;
     }
 
     /**
-     * Set the maximum amount of virtual memory on the tasktracker.
-     * @param vmem maximum amount of virtual memory on the tasktracker in kilobytes.
+     * Set the amount of virtual memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     * 
+     * @param reservedVmem amount of virtual memory reserved in bytes.
      */
-    void setTotalMemory(long totalMem) {
-      totalMemory = totalMem;
+    void setReservedVirtualMemory(long reservedVmem) {
+      reservedVirtualMemory = reservedVmem;
     }
-    
+
     /**
-     * Get the maximum amount of virtual memory on the tasktracker.
+     * Get the amount of virtual memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     */
+    long getReservedTotalMemory() {
+      return reservedVirtualMemory;
+    }
+
+    /**
+     * Set the maximum amount of physical memory on the tasktracker.
      * 
-     * If this is
-     * {@link JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT}, it should be ignored 
+     * @param totalRAM maximum amount of physical memory on the tasktracker in
+     *          bytes.
+     */
+    void setTotalPhysicalMemory(long totalRAM) {
+      totalPhysicalMemory = totalRAM;
+    }
+
+    /**
+     * Get the maximum amount of physical memory on the tasktracker.
+     * 
+     * If this is {@link JobConf#DISABLED_MEMORY_LIMIT}, it should be ignored
      * and not used in any computation.
      * 
-     * @return maximum amount of virtual memory on the tasktracker in kilobytes. 
-     */    
-    long getTotalMemory() {
-      return totalMemory;
+     * @return maximum amount of physical memory on the tasktracker in bytes.
+     */
+    long getTotalPhysicalMemory() {
+      return totalPhysicalMemory;
     }
-    
+
+    /**
+     * Set the amount of physical memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     * 
+     * @param reservedPmem amount of physical memory reserved in bytes.
+     */
+    void setReservedPhysicalMemory(long reservedPmem) {
+      reservedPhysicalMemory = reservedPmem;
+    }
+
+    /**
+     * Get the amount of physical memory reserved on the TaskTracker for system
+     * usage (OS, TT etc).
+     */
+    long getReservedPhysicalMemory() {
+      return reservedPhysicalMemory;
+    }
+
     void setAvailableSpace(long availSpace) {
       availableSpace = availSpace;
     }
@@ -120,15 +160,19 @@ class TaskTrackerStatus implements Writable {
     }
     
     public void write(DataOutput out) throws IOException {
-      WritableUtils.writeVLong(out, freeVirtualMemory);
-      WritableUtils.writeVLong(out, totalMemory);
+      WritableUtils.writeVLong(out, totalVirtualMemory);
+      WritableUtils.writeVLong(out, reservedVirtualMemory);
+      WritableUtils.writeVLong(out, totalPhysicalMemory);
+      WritableUtils.writeVLong(out, reservedPhysicalMemory);
       WritableUtils.writeVLong(out, availableSpace);
     }
     
     public void readFields(DataInput in) throws IOException {
-      freeVirtualMemory = WritableUtils.readVLong(in);;
-      totalMemory = WritableUtils.readVLong(in);;
-      availableSpace = WritableUtils.readVLong(in);;
+      totalVirtualMemory = WritableUtils.readVLong(in);
+      reservedVirtualMemory = WritableUtils.readVLong(in);
+      totalPhysicalMemory = WritableUtils.readVLong(in);
+      reservedPhysicalMemory = WritableUtils.readVLong(in);
+      availableSpace = WritableUtils.readVLong(in);
     }
   }
   

+ 49 - 0
src/test/org/apache/hadoop/mapred/DummyMemoryCalculatorPlugin.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
+
+/**
+ * Plugin class to test virtual and physical memories reported by TT. Use
+ * configuration items {@link #MAXVMEM_TESTING_PROPERTY} and
+ * {@link #MAXPMEM_TESTING_PROPERTY} to tell TT the total vmem and the total
+ * pmem.
+ */
+public class DummyMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
+
+  /** max vmem on the TT */
+  public static final String MAXVMEM_TESTING_PROPERTY =
+      "mapred.tasktracker.maxvmem.testing";
+  /** max pmem on the TT */
+  public static final String MAXPMEM_TESTING_PROPERTY =
+      "mapred.tasktracker.maxpmem.testing";
+
+  /** {@inheritDoc} */
+  @Override
+  public long getVirtualMemorySize() {
+    return getConf().getLong(MAXVMEM_TESTING_PROPERTY, -1);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getPhysicalMemorySize() {
+    return getConf().getLong(MAXPMEM_TESTING_PROPERTY, -1);
+  }
+}

+ 0 - 242
src/test/org/apache/hadoop/mapred/TestHighRAMJobs.java

@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.util.ToolRunner;
-
-import junit.framework.TestCase;
-
-/**
- * This test class tests the functionality related to configuring, reporting
- * and computing memory related parameters in a Map/Reduce cluster.
- * 
- * Each test sets up a {@link MiniMRCluster} with a locally defined 
- * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
- * the memory related configuration is correctly computed and reported from 
- * the tasktracker in 
- * {@link org.apache.hadoop.mapred.TaskScheduler.assignTasks()}.
- *  
- */
-public class TestHighRAMJobs extends TestCase {
-
-  private static final Log LOG = LogFactory.getLog(TestHighRAMJobs.class);
-
-  private static final String DEFAULT_SLEEP_JOB_MAP_COUNT = "1";
-  private static final String DEFAULT_SLEEP_JOB_REDUCE_COUNT = "1";
-  private static final String DEFAULT_MAP_SLEEP_TIME = "1000";
-  private static final String DEFAULT_REDUCE_SLEEP_TIME = "1000";
-  private static final long DISABLED_VIRTUAL_MEMORY_LIMIT = -1L;
-  
-  private MiniDFSCluster miniDFSCluster;
-  private MiniMRCluster miniMRCluster;
-  
-  public static class FakeTaskScheduler extends JobQueueTaskScheduler {
-    
-    private boolean hasPassed = true;
-    private String message;
-    private boolean isFirstTime = true;
-    
-    public FakeTaskScheduler() {
-      super();
-    }
-    
-    public boolean hasTestPassed() {
-      return hasPassed;
-    }
-    
-    public String getFailureMessage() {
-      return message;
-    }
-    
-    @Override
-    public List<Task> assignTasks(TaskTrackerStatus status) 
-                                          throws IOException {
-      TestHighRAMJobs.LOG.info("status = " + status.getResourceStatus().getFreeVirtualMemory());
-
-      long initialFreeMemory = getConf().getLong("initialFreeMemory", 0L);
-      long totalMemoryOnTT = getConf().getLong("totalMemoryOnTT", 0L);
-
-      if (isFirstTime) {
-        isFirstTime = false;
-        if (initialFreeMemory != status.getResourceStatus().getFreeVirtualMemory()) {
-          hasPassed = false;
-          message = "Initial memory expected = " + initialFreeMemory
-                      + " reported = " + status.getResourceStatus().getFreeVirtualMemory();
-        }
-        if (totalMemoryOnTT != status.getResourceStatus().getTotalMemory()) {
-          hasPassed = false;
-          message = "Total memory on TT expected = " + totalMemoryOnTT
-                      + " reported = " 
-                      + status.getResourceStatus().getTotalMemory();
-        }
-      } else if (initialFreeMemory != DISABLED_VIRTUAL_MEMORY_LIMIT) {
-        
-        long memoryPerTask = getConf().getLong("memoryPerTask", 0L);
-          
-        long expectedFreeMemory = 0;
-        int runningTaskCount = status.countMapTasks() +
-                              status.countReduceTasks();
-        expectedFreeMemory = initialFreeMemory - 
-                                (memoryPerTask * runningTaskCount);
-
-        TestHighRAMJobs.LOG.info("expected free memory = " + 
-                                  expectedFreeMemory + ", reported = " + 
-                                  status.getResourceStatus().getFreeVirtualMemory());
-        if (expectedFreeMemory != status.getResourceStatus().getFreeVirtualMemory()) {
-          hasPassed = false;
-          message = "Expected free memory after " + runningTaskCount
-                      + " tasks are scheduled = " + expectedFreeMemory
-                      + ", reported = " + status.getResourceStatus().getFreeVirtualMemory();
-        }
-      }
-      return super.assignTasks(status);
-    }
-  }
-  
-  /* Test that verifies default values are configured and reported
-   * correctly.
-   */
-  public void testDefaultValuesForHighRAMJobs() throws Exception {
-    long defaultMemoryLimit = DISABLED_VIRTUAL_MEMORY_LIMIT;
-    try {
-      setUpCluster(defaultMemoryLimit, defaultMemoryLimit, null);
-      runJob(defaultMemoryLimit, DEFAULT_MAP_SLEEP_TIME, 
-          DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT, 
-          DEFAULT_SLEEP_JOB_REDUCE_COUNT);
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-  
-  /* Test that verifies default value for memory per task on TT
-   * when the number of slots is non-default.
-   */
-  public void testDefaultMemoryPerTask() throws Exception {
-    long maxVmem = 2*1024*1024*1024L;
-    JobConf conf = new JobConf();
-    conf.setInt("mapred.tasktracker.map.tasks.maximum", 2);
-    conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 2);
-    // set a different value for the default memory per task
-    long defaultMemPerTask = 256*1024*1024L; 
-    try {
-      setUpCluster(maxVmem, defaultMemPerTask, 
-                    defaultMemPerTask, conf);
-      runJob(DISABLED_VIRTUAL_MEMORY_LIMIT, "10000",
-              DEFAULT_REDUCE_SLEEP_TIME, DEFAULT_SLEEP_JOB_MAP_COUNT,
-              DEFAULT_SLEEP_JOB_REDUCE_COUNT);
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-  
-  public void testHighRAMJob() throws Exception {
-    long maxVmem = 1024*1024*1024L;
-    //long defaultMemPerTaskOnTT = maxVmem/4; // 4 = default number of slots.
-    /* Set a HIGH RAM requirement for a job. As 4 is the
-     * default number of slots, we set up the memory limit
-     * per task to be more than 25%. 
-     */
-    long maxVmemPerTask = maxVmem/3;
-    try {
-      setUpCluster(maxVmem, maxVmemPerTask, null);
-      /* set up sleep limits higher, so the scheduler will see varying
-       * number of running tasks at a time. Also modify the number of
-       * map tasks so we test the iteration over more than one task.
-       */
-      runJob(maxVmemPerTask, "10000", "10000", "2", 
-                      DEFAULT_SLEEP_JOB_REDUCE_COUNT);
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-  
-  private void setUpCluster(long totalMemoryOnTT, long memoryPerTask,
-                              JobConf conf) throws Exception {
-    this.setUpCluster(totalMemoryOnTT, 512*1024*1024L, 
-                          memoryPerTask, conf);
-  }
-  
-  private void setUpCluster(long totalMemoryOnTT, long defaultMemoryPerTask,
-                              long memoryPerTask, JobConf conf)
-                                throws Exception {
-    if (conf == null) {
-      conf = new JobConf();
-    }
-    conf.setClass("mapred.jobtracker.taskScheduler", 
-        TestHighRAMJobs.FakeTaskScheduler.class,
-        TaskScheduler.class);
-    if (totalMemoryOnTT != -1L) {
-      conf.setLong("mapred.tasktracker.tasks.maxmemory", totalMemoryOnTT);  
-    }
-    conf.setLong("mapred.task.default.maxmemory", defaultMemoryPerTask);
-    conf.setLong("initialFreeMemory", totalMemoryOnTT);
-    conf.setLong("totalMemoryOnTT", totalMemoryOnTT);
-    conf.setLong("memoryPerTask", memoryPerTask);
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
-                      null, null, conf);    
-  }
-  
-  private void runJob(long memoryPerTask, String mapSleepTime,
-                        String reduceSleepTime, String mapTaskCount,
-                        String reduceTaskCount) 
-                                        throws Exception {
-    Configuration sleepJobConf = new Configuration();
-    sleepJobConf.set("mapred.job.tracker", "localhost:"
-                              + miniMRCluster.getJobTrackerPort());
-    if (memoryPerTask != -1L) {
-      sleepJobConf.setLong("mapred.task.maxmemory", memoryPerTask);
-    }
-    launchSleepJob(mapSleepTime, reduceSleepTime, 
-                    mapTaskCount, reduceTaskCount, sleepJobConf);    
-  }
-
-  private void launchSleepJob(String mapSleepTime, String reduceSleepTime,
-                              String mapTaskCount, String reduceTaskCount,
-                              Configuration conf) throws Exception {
-    String[] args = { "-m", mapTaskCount, "-r", reduceTaskCount,
-                      "-mt", mapSleepTime, "-rt", reduceSleepTime };
-    ToolRunner.run(conf, new SleepJob(), args);
-  }
-
-  private void verifyTestResults() {
-    FakeTaskScheduler scheduler = 
-      (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
-                              getJobTracker().getTaskScheduler();
-    assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
-  }
-  
-  private void tearDownCluster() {
-    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
-    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
-  }
-}

+ 12 - 2
src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -152,10 +152,20 @@ public class TestJobQueueTaskScheduler extends TestCase {
     public int getNextHeartbeatInterval() {
       return MRConstants.HEARTBEAT_INTERVAL_MIN;
     }
-    
+
+    @Override
+    public void killJob(JobID jobid) {
+      return;
+    }
+
+    @Override
+    public JobInProgress getJob(JobID jobid) {
+      return null;
+    }
+
     // Test methods
     
-    public void submitJob(JobInProgress job) {
+    public void submitJob(JobInProgress job) throws IOException {
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }

+ 241 - 0
src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java

@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+/**
+ * This test class tests the functionality related to configuring, reporting
+ * and computing memory related parameters in a Map/Reduce cluster.
+ * 
+ * Each test sets up a {@link MiniMRCluster} with a locally defined 
+ * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
+ * the memory related configuration is correctly computed and reported from 
+ * the tasktracker in 
+ * {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
+ */
+public class TestTTMemoryReporting extends TestCase {
+
+  static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
+  
+  private MiniDFSCluster miniDFSCluster;
+  private MiniMRCluster miniMRCluster;
+
+  /**
+   * Fake scheduler to test the proper reporting of memory values by TT
+   */
+  public static class FakeTaskScheduler extends JobQueueTaskScheduler {
+    
+    private boolean hasPassed = true;
+    private String message;
+    
+    public FakeTaskScheduler() {
+      super();
+    }
+    
+    public boolean hasTestPassed() {
+      return hasPassed;
+    }
+    
+    public String getFailureMessage() {
+      return message;
+    }
+    
+    @Override
+    public List<Task> assignTasks(TaskTrackerStatus status)
+        throws IOException {
+
+      long totalVirtualMemoryOnTT =
+          getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long totalPhysicalMemoryOnTT =
+          getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long virtualMemoryReservedOnTT =
+          getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long physicalMemoryReservedOnTT =
+          getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+
+      long reportedTotalVirtualMemoryOnTT =
+          status.getResourceStatus().getTotalVirtualMemory();
+      long reportedTotalPhysicalMemoryOnTT =
+          status.getResourceStatus().getTotalPhysicalMemory();
+      long reportedVirtualMemoryReservedOnTT =
+          status.getResourceStatus().getReservedTotalMemory();
+      long reportedPhysicalMemoryReservedOnTT =
+          status.getResourceStatus().getReservedPhysicalMemory();
+
+      message =
+          "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
+              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
+              + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
+              + ")";
+      message +=
+          "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
+              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + reportedTotalVirtualMemoryOnTT
+              + ", "
+              + reportedTotalPhysicalMemoryOnTT
+              + ", "
+              + reportedVirtualMemoryReservedOnTT
+              + ", "
+              + reportedPhysicalMemoryReservedOnTT + ")";
+      LOG.info(message);
+      if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
+          || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
+          || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
+          || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
+        hasPassed = false;
+      }
+      return super.assignTasks(status);
+    }
+  }
+
+  /**
+   * Test that verifies default values are configured and reported correctly.
+   * 
+   * @throws Exception
+   */
+  public void testDefaultMemoryValues()
+      throws Exception {
+    JobConf conf = new JobConf();
+    try {
+      // Memory values are disabled by default.
+      conf.setClass(
+          TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+          DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+      setUpCluster(conf);
+      runSleepJob();
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  /**
+   * Test that verifies that configured values are reported correctly.
+   * 
+   * @throws Exception
+   */
+  public void testConfiguredMemoryValues()
+      throws Exception {
+    JobConf conf = new JobConf();
+    conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
+    conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
+    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
+    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    conf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        4 * 1024 * 1024 * 1024L);
+    conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
+        2 * 1024 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
+        1 * 1024 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
+        512 * 1024 * 1024L);
+    try {
+      setUpCluster(conf);
+      runSleepJob();
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  /**
+   * Test that verifies that total memory values are calculated and reported
+   * correctly.
+   * 
+   * @throws Exception
+   */
+  public void testMemoryValuesOnLinux()
+      throws Exception {
+    if (!System.getProperty("os.name").startsWith("Linux")) {
+      return;
+    }
+
+    JobConf conf = new JobConf();
+    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
+    conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
+    conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
+    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
+    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
+        1 * 1024 * 1024 * 1024L);
+    conf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
+        512 * 1024 * 1024L);
+    try {
+      setUpCluster(conf);
+      runSleepJob();
+      verifyTestResults();
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  private void setUpCluster(JobConf conf)
+                                throws Exception {
+    conf.setClass("mapred.jobtracker.taskScheduler", 
+        TestTTMemoryReporting.FakeTaskScheduler.class,
+        TaskScheduler.class);
+    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fileSys = miniDFSCluster.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
+                      null, null, conf);    
+  }
+  
+  private void runSleepJob() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("mapred.job.tracker", "localhost:"
+                              + miniMRCluster.getJobTrackerPort());
+    String[] args = { "-m", "1", "-r", "1",
+                      "-mt", "1000", "-rt", "1000" };
+    ToolRunner.run(conf, new SleepJob(), args);
+  }
+
+  private void verifyTestResults() {
+    FakeTaskScheduler scheduler = 
+      (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
+                              getJobTracker().getTaskScheduler();
+    assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
+  }
+  
+  private void tearDownCluster() {
+    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
+    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+  }
+}

+ 57 - 17
src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java

@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -46,7 +47,7 @@ public class TestTaskTrackerMemoryManager extends TestCase {
 
   private String taskOverLimitPatternString =
       "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
-          + "Current usage : [0-9]*kB. Limit : %skB. Killing task.";
+          + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
 
   private void startCluster(JobConf conf) throws Exception {
     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
@@ -168,14 +169,21 @@ public class TestTaskTrackerMemoryManager extends TestCase {
       return;
     }
 
+    // Fairly large value for sleepJob to succeed
+    long ttLimit = 4 * 1024 * 1024 * 1024L;
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
 
-    // Fairly large value for sleepJob to succeed
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", 10000000000L);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        ttLimit);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
     startCluster(fConf);
-
-    // Set up job.
     JobConf conf = new JobConf();
     runAndCheckSuccessfulJob(conf);
   }
@@ -193,19 +201,29 @@ public class TestTaskTrackerMemoryManager extends TestCase {
       return;
     }
 
-    long PER_TASK_LIMIT = 10000000000L; // Large so sleepjob goes through.
-    long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+    // Large so that sleepjob goes through and fits total TT usage
+    long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
+    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
 
-    // Fairly large value for sleepjob to succeed
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
     startCluster(fConf);
-
     JobConf conf = new JobConf();
     conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
+
   }
 
   /**
@@ -223,7 +241,8 @@ public class TestTaskTrackerMemoryManager extends TestCase {
     }
 
     long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
-    long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
+    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
+    // total usage
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, String
             .valueOf(PER_TASK_LIMIT)));
@@ -231,7 +250,17 @@ public class TestTaskTrackerMemoryManager extends TestCase {
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
 
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
@@ -313,14 +342,25 @@ public class TestTaskTrackerMemoryManager extends TestCase {
         Pattern.compile(String.format(taskOverLimitPatternString, String
             .valueOf(PER_TASK_LIMIT)));
     Pattern trackerOverLimitPattern =
-        Pattern.compile("Killing one of the least progress tasks - .*, as "
-            + "the cumulative memory usage of all the tasks on the TaskTracker"
-            + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+        Pattern
+            .compile("Killing one of the least progress tasks - .*, as "
+                + "the cumulative memory usage of all the tasks on the TaskTracker"
+                + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
     Matcher mat = null;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
+    fConf.setClass(
+        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
+    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        TASK_TRACKER_LIMIT);
+    fConf.setLong(
+        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
@@ -374,4 +414,4 @@ public class TestTaskTrackerMemoryManager extends TestCase {
     // Test succeeded, kill the job.
     job.killJob();
   }
-}
+}

+ 2 - 2
src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

@@ -39,7 +39,7 @@ public class TestProcfsBasedProcessTree extends TestCase {
   private String shellScript;
   private static final int N = 10; // Controls the RogueTask
 
-  private static final int memoryLimit = 15000; // kilobytes
+  private static final int memoryLimit = 15 * 1024 * 1024; // 15MB
   private static final long PROCESSTREE_RECONSTRUCTION_INTERVAL =
     ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL; // msec
 
@@ -125,7 +125,7 @@ public class TestProcfsBasedProcessTree extends TestCase {
       while (true) {
         LOG.info("ProcessTree: " + p.toString());
         long mem = p.getCumulativeVmem();
-        LOG.info("Memory usage: " + mem + "kB.");
+        LOG.info("Memory usage: " + mem + "bytes.");
         if (mem > memoryLimit) {
           p.destroy();
           break;