Browse Source

HADOOP-5881. Simplify memory monitoring and scheduling related configuration. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@778700 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 năm trước cách đây
mục cha
commit
828007f1a7

+ 3 - 0
CHANGES.txt

@@ -7,6 +7,9 @@ Release 0.20.1 - Unreleased
     HADOOP-5726. Remove pre-emption from capacity scheduler code base.
     (Rahul Kumar Singh via yhemanth)
 
+    HADOOP-5881. Simplify memory monitoring and scheduling related
+    configuration. (Vinod Kumar Vavilapalli via yhemanth)
+
   NEW FEATURES
 
   IMPROVEMENTS

+ 0 - 28
conf/capacity-scheduler.xml.template

@@ -56,34 +56,6 @@
       account in scheduling decisions by default in a job queue.
     </description>
   </property>
-
-  <property>
-    <name>mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem</name>
-    <value>-1</value>
-    <description>A percentage (float) of the default VM limit for jobs
-   	  (mapred.task.default.maxvm). This is the default RAM task-limit 
-   	  associated with a task. Unless overridden by a job's setting, this 
-   	  number defines the RAM task-limit.
-
-      If this property is missing, or set to an invalid value, scheduling 
-      based on physical memory, RAM, is disabled.  
-    </description>
-  </property>
-
-  <property>
-    <name>mapred.capacity-scheduler.task.limit.maxpmem</name>
-    <value>-1</value>
-    <description>Configuration that provides an upper limit on the maximum
-      physical memory that can be specified by a job. The job configuration
-      mapred.task.maxpmem should be less than this value. If not, the job will
-      be rejected by the scheduler.
-      
-      If it is set to -1, scheduler will not consider physical memory for
-      scheduling even if virtual memory based scheduling is enabled(by setting
-      valid values for both mapred.task.default.maxvmem and
-      mapred.task.limit.maxvmem).
-    </description>
-  </property>
   
   <property>
     <name>mapred.capacity-scheduler.default-minimum-user-limit-percent</name>

+ 0 - 40
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -351,44 +351,4 @@ class CapacitySchedulerConf {
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
-
-  /**
-   * Get the upper limit on the maximum physical memory that can be specified by
-   * a job.
-   * 
-   * @return upper limit for max pmem for tasks.
-   */
-  public long getLimitMaxPmemForTasks() {
-    return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
-        JobConf.DISABLED_MEMORY_LIMIT);
-  }
-
-  /**
-   * Get the upper limit on the maximum physical memory that can be specified by
-   * a job.
-   * 
-   * @param value
-   */
-  public void setLimitMaxPmemForTasks(long value) {
-    rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
-  }
-
-  /**
-   * Get cluster-wide default percentage of pmem in vmem.
-   * 
-   * @return cluster-wide default percentage of pmem in vmem.
-   */
-  public float getDefaultPercentOfPmemInVmem() {
-    return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
-        JobConf.DISABLED_MEMORY_LIMIT);
-  }
-
-  /**
-   * Set cluster-wide default percentage of pmem in vmem.
-   * 
-   * @param value
-   */
-  public void setDefaultPercentOfPmemInVmem(float value) {
-    rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
-  }
 }

+ 62 - 91
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -278,11 +278,7 @@ class CapacityTaskScheduler extends TaskScheduler {
 
     /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
-    // can be replaced with a global type, if we have one
-    protected static enum TYPE {
-      MAP, REDUCE
-    }
-    protected TYPE type = null;
+    protected CapacityTaskScheduler.TYPE type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
@@ -413,7 +409,8 @@ class CapacityTaskScheduler extends TaskScheduler {
         //If this job meets memory requirements. Ask the JobInProgress for
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
-        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+            taskTracker)) {
           // We found a suitable job. Get task from it.
           Task t = obtainNewTask(taskTracker, j);
           //if there is a task return it immediately.
@@ -422,6 +419,8 @@ class CapacityTaskScheduler extends TaskScheduler {
             return TaskLookupResult.getTaskFoundResult(t);
           } else {
             //skip to the next job in the queue.
+            LOG.debug("Job " + j.getJobID().toString()
+                + " returned no tasks of type " + type);
             continue;
           }
         } else {
@@ -456,7 +455,8 @@ class CapacityTaskScheduler extends TaskScheduler {
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
+            taskTracker)) {
           // We found a suitable job. Get task from it.
           Task t = obtainNewTask(taskTracker, j);
           //if there is a task return it immediately.
@@ -561,7 +561,7 @@ class CapacityTaskScheduler extends TaskScheduler {
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
     MapSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = TaskSchedulingMgr.TYPE.MAP;
+      type = CapacityTaskScheduler.TYPE.MAP;
       queueComparator = mapComparator;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
@@ -603,7 +603,7 @@ class CapacityTaskScheduler extends TaskScheduler {
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
     ReduceSchedulingMgr(CapacityTaskScheduler dad) {
       super(dad);
-      type = TaskSchedulingMgr.TYPE.REDUCE;
+      type = CapacityTaskScheduler.TYPE.REDUCE;
       queueComparator = reduceComparator;
     }
     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
@@ -664,13 +664,18 @@ class CapacityTaskScheduler extends TaskScheduler {
       return System.currentTimeMillis();
     }
   }
+  // can be replaced with a global type, if we have one
+  protected static enum TYPE {
+    MAP, REDUCE
+  }
+
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
-  long limitMaxVmemForTasks;
-  long limitMaxPmemForTasks;
-  long defaultMaxVmPerTask;
-  float defaultPercentOfPmemInVmem;
+  private long memSizeForMapSlotOnJT;
+  private long memSizeForReduceSlotOnJT;
+  private long limitMaxMemForMapTasks;
+  private long limitMaxMemForReduceTasks;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -687,37 +692,45 @@ class CapacityTaskScheduler extends TaskScheduler {
     this.schedConf = conf;
   }
 
-  /**
-   * Normalize the negative values in configuration
-   * 
-   * @param val
-   * @return normalized value
-   */
-  private long normalizeMemoryConfigValue(long val) {
-    if (val < 0) {
-      val = JobConf.DISABLED_MEMORY_LIMIT;
-    }
-    return val;
-  }
-
   private void initializeMemoryRelatedConf() {
-    limitMaxVmemForTasks =
-        normalizeMemoryConfigValue(conf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append(
+        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+            limitMaxMemForReduceTasks).append(")"));
+  }
 
-    limitMaxPmemForTasks =
-        normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
+  long getMemSizeForMapSlot() {
+    return memSizeForMapSlotOnJT;
+  }
 
-    defaultMaxVmPerTask =
-        normalizeMemoryConfigValue(conf.getLong(
-            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
+  long getMemSizeForReduceSlot() {
+    return memSizeForReduceSlotOnJT;
+  }
 
-    defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
-    if (defaultPercentOfPmemInVmem < 0) {
-      defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
-    }
+  long getLimitMaxMemForMapSlot() {
+    return limitMaxMemForMapTasks;
+  }
+
+  long getLimitMaxMemForReduceSlot() {
+    return limitMaxMemForReduceTasks;
   }
 
   @Override
@@ -955,14 +968,12 @@ class CapacityTaskScheduler extends TaskScheduler {
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
-        tlr.getLookUpStatus()) {
-        // return no task
-        return null;
-      }
       // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
-        tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+                                  == tlr.getLookUpStatus() ||
+                TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+                                  == tlr.getLookUpStatus())
+          && (maxMapTasks > currentMapTasks)) {
         mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -980,13 +991,12 @@ class CapacityTaskScheduler extends TaskScheduler {
         // found a task; return
         return Collections.singletonList(tlr.getTask());
       }
-      else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
-        tlr.getLookUpStatus()) {
-        return null;
-      }
       // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
-        tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
+                                    == tlr.getLookUpStatus()
+                || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
+                                    == tlr.getLookUpStatus())
+          && (maxReduceTasks > currentReduceTasks)) {
         reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -999,38 +1009,6 @@ class CapacityTaskScheduler extends TaskScheduler {
     return null;
   }
 
-  /**
-   * Kill the job if it has invalid requirements and return why it is killed
-   * 
-   * @param job
-   * @return string mentioning why the job is killed. Null if the job has valid
-   *         requirements.
-   */
-  private String killJobIfInvalidRequirements(JobInProgress job) {
-    if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
-      return null;
-    }
-    if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
-        || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
-            .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
-      String msg =
-          job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
-              + job.getMaxPhysicalMemoryForTask()
-              + "pmem) exceeds the cluster's max-memory-limits ("
-              + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
-              + "pmem). Cannot run in this cluster, so killing it.";
-      LOG.warn(msg);
-      try {
-        taskTrackerManager.killJob(job.getJobID());
-        return msg;
-      } catch (IOException ioe) {
-        LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
-            + StringUtils.stringifyException(ioe));
-      }
-    }
-    return null;
-  }
-
   // called when a job is added
   synchronized void jobAdded(JobInProgress job) throws IOException {
     QueueSchedulingInfo qsi = 
@@ -1050,13 +1028,6 @@ class CapacityTaskScheduler extends TaskScheduler {
     qsi.numJobsByUser.put(job.getProfile().getUser(), i);
     LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
               + job.getProfile().getUser() + ", user now has " + i + " jobs");
-
-    // Kill the job if it cannot run in the cluster because of invalid
-    // resource requirements.
-    String statusMsg = killJobIfInvalidRequirements(job);
-    if (statusMsg != null) {
-      throw new IOException(statusMsg);
-    }
   }
 
   // called when a job completes

+ 77 - 180
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java

@@ -30,111 +30,33 @@ class MemoryMatcher {
     this.scheduler = capacityTaskScheduler;
   }
 
-  boolean isSchedulingBasedOnVmemEnabled() {
-    LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
-        + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
-    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
+  boolean isSchedulingBasedOnMemEnabled() {
+    if (scheduler.getLimitMaxMemForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getLimitMaxMemForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getMemSizeForMapSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT
+        || scheduler.getMemSizeForReduceSlot()
+                                  == JobConf.DISABLED_MEMORY_LIMIT) {
       return false;
     }
     return true;
   }
 
-  boolean isSchedulingBasedOnPmemEnabled() {
-    LOG.debug("defaultPercentOfPmemInVmem : "
-        + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
-        + scheduler.limitMaxPmemForTasks);
-    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
-        || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Obtain the virtual memory allocated for a job's tasks.
-   * 
-   * If the job has a configured value for the max-virtual memory, that will be
-   * returned. Else, the cluster-wide default max-virtual memory for tasks is
-   * returned.
-   * 
-   * This method can only be called after
-   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
-   * 
-   * @param jConf JobConf of the job
-   * @return the virtual memory allocated for the job's tasks.
-   */
-  private long getVirtualMemoryForTask(JobConf jConf) {
-    long vMemForTask = jConf.getMaxVirtualMemoryForTask();
-    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      vMemForTask =
-          new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-              scheduler.defaultMaxVmPerTask);
-    }
-    return vMemForTask;
-  }
-
-  /**
-   * Obtain the physical memory allocated for a job's tasks.
-   * 
-   * If the job has a configured value for the max physical memory, that
-   * will be returned. Else, the cluster-wide default physical memory for
-   * tasks is returned.
-   * 
-   * This method can only be called after
-   * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
-   * 
-   * @param jConf JobConf of the job
-   * @return the physical memory allocated for the job's tasks
-   */
-  private long getPhysicalMemoryForTask(JobConf jConf) {
-    long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
-    if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      pMemForTask =
-          Math.round(getVirtualMemoryForTask(jConf)
-              * scheduler.defaultPercentOfPmemInVmem);
-    }
-    return pMemForTask;
-  }
-
-  static class Memory {
-    long vmem;
-    long pmem;
-
-    Memory(long vm, long pm) {
-      this.vmem = vm;
-      this.pmem = pm;
-    }
-  }
-
   /**
    * Find the memory that is already used by all the running tasks
    * residing on the given TaskTracker.
    * 
    * @param taskTracker
+   * @param taskType 
    * @return amount of memory that is used by the residing tasks,
    *          null if memory cannot be computed for some reason.
    */
-  private synchronized Memory getMemReservedForTasks(
-      TaskTrackerStatus taskTracker) {
-    boolean disabledVmem = false;
-    boolean disabledPmem = false;
-
-    if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      disabledVmem = true;
-    }
-
-    if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
-      disabledPmem = true;
-    }
-
-    if (disabledVmem && disabledPmem) {
-      return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
-          JobConf.DISABLED_MEMORY_LIMIT);
-    }
-
+  private synchronized Long getMemReservedForTasks(
+      TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
     long vmem = 0;
-    long pmem = 0;
+    long myVmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
@@ -142,12 +64,12 @@ class MemoryMatcher {
       // accounted in used memory.
       if ((task.getRunState() == TaskStatus.State.RUNNING)
           || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        JobInProgress job = scheduler.taskTrackerManager.getJob(
-                                              task.getTaskID().getJobID());
+        JobInProgress job =
+            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
         if (job == null) {
           // This scenario can happen if a job was completed/killed
-          // and retired from JT's memory. In this state, we can ignore 
-          // the running task status and compute memory for the rest of 
+          // and retired from JT's memory. In this state, we can ignore
+          // the running task status and compute memory for the rest of
           // the tasks. However, any scheduling done with this computation
           // could result in over-subscribing of memory for tasks on this
           // TT (as the unaccounted for task is still running).
@@ -155,123 +77,98 @@ class MemoryMatcher {
           // One of the ways of doing that is to return null from here
           // and check for null in the calling method.
           LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
-                    + "a running / commit pending task: " + task.getTaskID()
-                    + " but no corresponding job was found. "
-                    + "Maybe job was retired. Not computing "
-                    + "memory values for this TT.");
+              + "a running / commit pending task: " + task.getTaskID()
+              + " but no corresponding job was found. "
+              + "Maybe job was retired. Not computing "
+              + "memory values for this TT.");
           return null;
         }
-        
-        JobConf jConf =
-            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
-                .getJobConf();
-        if (!disabledVmem) {
-          vmem += getVirtualMemoryForTask(jConf);
-        }
-        if (!disabledPmem) {
-          pmem += getPhysicalMemoryForTask(jConf);
+
+        JobConf jConf = job.getJobConf();
+
+        // Get the memory "allotted" for this task by rounding off the job's
+        // tasks' memory limits to the nearest multiple of the slot-memory-size
+        // set on JT. This essentially translates to tasks of a high memory job
+        // using multiple slots.
+        if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+          myVmem = jConf.getMemoryForMapTask();
+          myVmem =
+              (long) (scheduler.getMemSizeForMapSlot() * Math
+                  .ceil((float) myVmem
+                      / (float) scheduler.getMemSizeForMapSlot()));
+        } else if (!task.getIsMap()
+            && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+          myVmem = jConf.getMemoryForReduceTask();
+          myVmem =
+              (long) (scheduler.getMemSizeForReduceSlot() * Math
+                  .ceil((float) myVmem
+                      / (float) scheduler.getMemSizeForReduceSlot()));
         }
+        vmem += myVmem;
       }
     }
 
-    return new Memory(vmem, pmem);
+    return Long.valueOf(vmem);
   }
 
   /**
-   * Check if a TT has enough pmem and vmem to run this job.
+   * Check if a TT has enough memory to run of task specified from this job.
    * @param job
+   * @param taskType 
    * @param taskTracker
    * @return true if this TT has enough memory for this job. False otherwise.
    */
   boolean matchesMemoryRequirements(JobInProgress job,
-      TaskTrackerStatus taskTracker) {
+      CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
 
-    // ////////////// vmem based scheduling
-    if (!isSchedulingBasedOnVmemEnabled()) {
-      LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
-          + "and limitMaxVmemPerTasks is not configured. Scheduling based "
-          + "on job's memory requirements is disabled, ignoring any value "
-          + "set by job.");
-      return true;
-    }
+    LOG.debug("Matching memory requirements of " + job.getJobID().toString()
+        + " for scheduling on " + taskTracker.trackerName);
 
-    TaskTrackerStatus.ResourceStatus resourceStatus =
-        taskTracker.getResourceStatus();
-    long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
-    long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
-
-    if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
-        || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
+    if (!isSchedulingBasedOnMemEnabled()) {
+      LOG.debug("Scheduling based on job's memory requirements is disabled."
+          + " Ignoring any value set by job.");
       return true;
     }
 
-    if (reservedVMemOnTT > totalVMemOnTT) {
-      return true;
-    }
-
-    long jobVMemForTask = job.getMaxVirtualMemoryForTask();
-    if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      jobVMemForTask = scheduler.defaultMaxVmPerTask;
-    }
-
-    Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
-    if (memReservedForTasks == null) {
+    Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
+    if (memUsedOnTT == null) {
       // For some reason, maybe because we could not find the job
       // corresponding to a running task (as can happen if the job
       // is retired in between), we could not compute the memory state
       // on this TT. Treat this as an error, and fail memory
       // requirements.
-      LOG.info("Could not compute memory for taskTracker: " 
-                + taskTracker.getHost() + ". Failing memory requirements.");
+      LOG.info("Could not compute memory for taskTracker: "
+          + taskTracker.getHost() + ". Failing memory requirements.");
       return false;
     }
-    long vmemUsedOnTT = memReservedForTasks.vmem;
-    long pmemUsedOnTT = memReservedForTasks.pmem;
 
-    long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
+    long totalMemUsableOnTT = 0;
 
-    if (jobVMemForTask > freeVmemUsedOnTT) {
-      return false;
+    long memForThisTask = 0;
+    if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      memForThisTask = job.getJobConf().getMemoryForMapTask();
+      totalMemUsableOnTT =
+          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
+    } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      memForThisTask = job.getJobConf().getMemoryForReduceTask();
+      totalMemUsableOnTT =
+          scheduler.getMemSizeForReduceSlot()
+              * taskTracker.getMaxReduceTasks();
     }
 
-    // ////////////// pmem based scheduling
-
-    long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
-    long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
-    long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
-    long freePmemUsedOnTT = 0;
-
-    if (isSchedulingBasedOnPmemEnabled()) {
-      if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
-          || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-        return true;
-      }
-
-      if (reservedPmemOnTT > totalPmemOnTT) {
-        return true;
-      }
-
-      if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-        jobPMemForTask =
-            Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
-      }
-
-      freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
-
-      if (jobPMemForTask > freePmemUsedOnTT) {
-        return false;
-      }
-    } else {
-      LOG.debug("One of the configuration parameters "
-          + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
-          + "configured. Scheduling based on job's physical memory "
-          + "requirements is disabled, ignoring any value set by job.");
+    long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
+    if (memForThisTask > freeMemOnTT) {
+      LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+          + freeMemOnTT + "). A " + taskType + " task from "
+          + job.getJobID().toString() + " cannot be scheduled on TT "
+          + taskTracker.trackerName);
+      return false;
     }
 
-    LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
-        + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
-        + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
-        + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
+    LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+        + freeMemOnTT + ". A " + taskType.toString() + " task from "
+        + job.getJobID().toString() + " matches memory requirements on TT "
+        + taskTracker.trackerName);
     return true;
   }
 }

+ 135 - 403
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -182,8 +182,6 @@ public class TestCapacityScheduler extends TestCase {
       }
       mapTaskCtr = 0;
       redTaskCtr = 0;
-      super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
-      super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
     }
     
     @Override
@@ -232,7 +230,7 @@ public class TestCapacityScheduler extends TestCase {
       }
       return task;
     }
-    
+
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
@@ -727,7 +725,7 @@ public class TestCapacityScheduler extends TestCase {
   private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
     FakeJobInProgress job =
         new FakeJobInProgress(new JobID("test", ++jobCounter),
-            (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+            (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
             jobConf.getUser());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
@@ -1498,12 +1496,6 @@ public class TestCapacityScheduler extends TestCase {
     LOG.debug("Starting the scheduler.");
     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
 
-    // Limited TT - 1GB vmem and 512MB pmem
-    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
-        .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
-    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
-        .setTotalPhysicalMemory(512 * 1024 * 1024L);
-
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1513,11 +1505,11 @@ public class TestCapacityScheduler extends TestCase {
     // memory-based scheduling disabled by default.
     scheduler.start();
 
-    LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
-        + "and 1 reduce task.");
+    LOG.debug("Submit one high memory job of 1 3GB map task "
+        + "and 1 1GB reduce task.");
     JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
-    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setMemoryForMapTask(3 * 1024L); // 3GB
+    jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -1532,197 +1524,57 @@ public class TestCapacityScheduler extends TestCase {
   }
 
   /**
-   * Test to verify that highPmemJobs are scheduled like all other jobs when
-   * physical-memory based scheduling is not enabled.
-   * @throws IOException
-   */
-  public void testDisabledPmemBasedScheduling()
-      throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
-    // Limited TT - 100GB vmem and 500MB pmem
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-
-    taskTrackerManager.addQueues(new String[] { "default" });
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enable vmem-based scheduling. pmem based scheduling disabled by default.
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    scheduler.start();
-
-    LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
-        + "and 1 reduce task.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
-    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.RUNNING, jConf);
-
-    // assert that all tasks are launched even though they transgress the
-    // scheduling limits.
-
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-  }
-
-  /**
-   * Test HighMemoryJobs.
-   * @throws IOException
-   */
-  public void testHighMemoryJobs()
-      throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
-
-    taskTrackerManager.addQueues(new String[] { "default" });
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
-        + "1 map task and 1 reduce task.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
-    // No more tasks of this job can run on the TT because of lack of vmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-
-    LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
-        + "1 map task and 0 reduces.");
-    jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    submitJobAndInit(JobStatus.PREP, jConf); // job2
-
-    // This job shouldn't run the TT now because of lack of pmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
-    LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
-        + "0 maps and 1 reduce task.");
-    jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
-    jConf.setNumMapTasks(0);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.PREP, jConf); // job3
-
-    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-  }
-
-  /**
-   * Test HADOOP-4979. 
-   * Bug fix for making sure we always return null to TT if there is a 
-   * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
-   * or vice-versa.
+   * Test reverting HADOOP-4979. If there is a high-mem job, we should now look
+   * at reduce jobs (if map tasks are high-mem) or vice-versa.
+   * 
    * @throws IOException
    */
-  public void testHighMemoryBlocking()
+  public void testHighMemoryBlockingAcrossTaskTypes()
       throws IOException {
 
     // 2 map and 1 reduce slots
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
 
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    // Normal job on this TT would be 1GB vmem, 0.5GB pmem
-
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    // We need a situation where the scheduler needs to run a map task, 
-    // but the available one has a high-mem requirement. There should
-    // be another job whose maps or reduces can run, but they shouldn't 
-    // be scheduled.
+    // The situation : Two jobs in the queue. First job with only maps and no
+    // reduces and is a high memory job. Second job is a normal job with both maps and reduces.
+    // First job cannot run for want of memory for maps. In this case, second job's reduces should run.
     
-    LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+
+    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
-    jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
-    jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
@@ -1732,75 +1584,8 @@ public class TestCapacityScheduler extends TestCase {
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // at this point, the scheduler tries to schedule another map from j1. 
-    // there isn't enough space. There is space to run the second job's
-    // map or reduce task, but they shouldn't be scheduled
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-  }
-  
-  /**
-   * test invalid highMemoryJobs
-   * @throws IOException
-   */
-  public void testHighMemoryJobWithInvalidRequirements()
-      throws IOException {
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
-    ttStatus.setReservedPhysicalMemory(0);
-
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    taskTrackerManager.addQueues(new String[] { "default" });
-    resConf.setFakeQueues(queues);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
-    long vmemDefault = 1536 * 1024 * 1024L;
-    long pmemUpperLimit = vmemUpperLimit;
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        vmemDefault);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        vmemUpperLimit);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
-        + "1 map, 0 reduce tasks.");
-    long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
-    long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
-    jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-
-    boolean throwsException = false;
-    String msg = null;
-    FakeJobInProgress job;
-    try {
-      job = submitJob(JobStatus.PREP, jConf);
-    } catch (IOException ioe) {
-      // job has to fail
-      throwsException = true;
-      msg = ioe.getMessage();
-    }
-
-    assertTrue(throwsException);
-    job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
-    assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
-        + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
-        + vmemUpperLimit + "vmem, " + pmemUpperLimit
-        + "pmem\\). Cannot run in this cluster, so killing it."));
-    // For job, no cleanup task needed so gets killed immediately.
-    assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+    // there isn't enough space. The second job's reduce should be scheduled.
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
   }
 
   /**
@@ -1811,13 +1596,7 @@ public class TestCapacityScheduler extends TestCase {
       throws IOException {
 
     LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1825,68 +1604,65 @@ public class TestCapacityScheduler extends TestCase {
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        4 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+    // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
+    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
+    jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    // TTs should not run these jobs i.e. cluster blocked because of lack of
-    // vmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
 
-    // Job should still be alive
-    assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
+    // Fill the second tt with this job.
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
 
-    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
-    // Use cluster-wide defaults
-    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+        + "2 map, 2 reduce tasks.");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // cluster should still be blocked for job1 and so even job2 should not run
-    // even though it is a normal job
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    scheduler.taskTrackerManager.killJob(job2.getJobID());
-    scheduler.taskTrackerManager.killJob(job1.getJobID());
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
 
-    LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
-    // TTs should not run these jobs i.e. cluster blocked because of lack of
-    // pmem now.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    
-    // Job should still be alive
-    assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
 
-    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
-    // Use cluster-wide defaults
-    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    submitJobAndInit(JobStatus.PREP, jConf); // job4
-
-    // cluster should still be blocked for job3 and so even job4 should not run
-    // even though it is a normal job
+    // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
   }
 
   /**
@@ -1900,13 +1676,6 @@ public class TestCapacityScheduler extends TestCase {
     // create a cluster with a single node.
     LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
-    ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
 
     // create scheduler
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1915,14 +1684,17 @@ public class TestCapacityScheduler extends TestCase {
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    LOG.debug("By default, jobs get 0.5 GB per task vmem" +
-        " and 2 GB max vmem, with 50% of it for RAM");
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        512 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        2 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(50.0f);
-    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+    LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 512);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 512);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     
@@ -1931,6 +1703,8 @@ public class TestCapacityScheduler extends TestCase {
     JobConf jConf = new JobConf();
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
+    jConf.setMemoryForMapTask(512);
+    jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -1949,6 +1723,8 @@ public class TestCapacityScheduler extends TestCase {
     jConf = new JobConf();
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
+    jConf.setMemoryForMapTask(512);
+    jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -2396,21 +2172,8 @@ public class TestCapacityScheduler extends TestCase {
    */
   public void testHighRamJobWithSpeculativeExecution() throws IOException {
     // 2 map and 2 reduce slots
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
-    
-    //task tracker memory configurations.
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-   
+    taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+    // 1GB for each map, 1GB for each reduce
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2418,19 +2181,23 @@ public class TestCapacityScheduler extends TestCase {
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    
+    //Submit a high memory job with speculative tasks.
     JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
@@ -2439,20 +2206,18 @@ public class TestCapacityScheduler extends TestCase {
     jConf.setReduceSpeculativeExecution(false);
     FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
           jConf, taskTrackerManager,"u1");
-    
-    //Submit a high memory job with speculative tasks.
     taskTrackerManager.submitJob(job1);
-    
+
+    //Submit normal job
     jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
-    jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(false);
-    //Submit normal job
     FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
 
     controlledInitializationPoller.selectJobsToInitialize();
@@ -2487,8 +2252,8 @@ public class TestCapacityScheduler extends TestCase {
     
     //Now submit high ram job with speculative reduce and check.
     jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024L);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -2497,76 +2262,43 @@ public class TestCapacityScheduler extends TestCase {
     jConf.setReduceSpeculativeExecution(true);
     FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
           jConf, taskTrackerManager,"u1");
-    
-    //Submit a high memory job with speculative reduce tasks.
     taskTrackerManager.submitJob(job3);
-    
+
+    //Submit normal job w.r.t reduces
     jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
-    jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+    jConf.setMemoryForMapTask(2 * 1024L);
+    jConf.setMemoryForReduceTask(1 * 104L);
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(false);
-    //Submit normal job
     FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
     
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
-    //all maps of jobs get assigned to same task tracker as
-    //job does not have speculative map and same tracker sends two heart
-    //beat back to back.
+
+    // Finish up the map scheduler
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    //first map slot gets attention on this tracker.
     checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
-    //now first reduce of the job3 would be scheduled on tt2 since it has
-    //memory.
-    //assigntasks() would check for free reduce slot is greater than
-    //map slots. Seeing there is more free reduce slot it would try scheduling
-    //reduce of job1 but would block as in it is a high memory task.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    //TT2 would get the reduce task from high memory job as the tt is running
-    //normal jobs map. which is low mem.
-    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
-    // now if either TT comes back, it will block because all maps
-    // are done, and the first jobs reduce has a speculative task.
+
+    // first, a reduce from j3 will run
+    // at this point, there is a speculative task for the same job to be
+    //scheduled. This task would be scheduled. Till the tasks from job3 gets
+    //complete none of the tasks from other jobs would be scheduled.
+    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+    assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+    //make same tracker get back, check if you are blocking. Your job
+    //has speculative reduce task so tracker should be blocked even tho' it
+    //can run job4's reduce.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt2"))); 
-    //finish maps.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", 
-        job3);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0", 
-        job4);
-    //check speculative reduce code path is covered.
-    assertEquals("Pending reduces not zero for high " +
-    		"ram job with speculative reduce.", 0, job3.pendingReduces());
-    //if tt2 returns back it is not given any task even if it can schedule
-    //job2 reduce.
-    assertNull(scheduler.assignTasks(tracker("tt2")));
-    //speculative reduce of the job3 would be scheduled.
-    checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1");
-    //now both speculative and actual task have been scheduled for job3.
-    //Normal task of Job4 would now be scheduled on TT1 as it has free space
-    //to run.
+    //TT2 now gets speculative map of the job1
+    checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+
+    // Now since j3 has no more speculative reduces, it can schedule
+    // the j4.
     checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
-    //No more tasks.
-    assertNull(scheduler.assignTasks(tracker("tt2")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    
-    //finish all the reduces.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1", 
-        job3);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0", 
-        job3);
-    //finish the job
-    taskTrackerManager.finalizeJob(job3);
-    //finish the task and the job.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0", 
-        job4);
-    taskTrackerManager.finalizeJob(job4);
-    
   }
 
   private void checkFailedInitializedJobMovement() throws IOException {

+ 0 - 53
src/mapred/mapred-default.xml

@@ -185,42 +185,6 @@
   </description>
 </property>
 
-<property>
-  <name>mapred.tasktracker.pmem.reserved</name>
-  <value>-1</value>
-  <description>Configuration property to specify the amount of physical memory
-    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
-    The reserved physical memory should be a part of the total physical memory
-    available on the TaskTracker.
-
-    The reserved physical memory and the total physical memory values are
-    reported by the TaskTracker as part of heart-beat so that they can
-    considered by a scheduler. Please refer to the documentation of the
-    configured scheduler to see how this property is used.
-  </description>
-</property>
-
-<property>
-  <name>mapred.task.default.maxvmem</name>
-  <value>-1</value>
-  <description>
-    Cluster-wide configuration in bytes to be set by the administrators that
-    provides default amount of maximum virtual memory for job's tasks. This has
-    to be set on both the JobTracker node for the sake of scheduling decisions
-    and on the TaskTracker nodes for the sake of memory management.
-
-    If a job doesn't specify its virtual memory requirement by setting
-    mapred.task.maxvmem to -1, tasks are assured a memory limit set
-    to this property. This property is set to -1 by default.
-
-    This value should in general be less than the cluster-wide
-    configuration mapred.task.limit.maxvmem. If not or if it is not set,
-    TaskTracker's memory management will be disabled and a scheduler's memory
-    based scheduling decisions may be affected. Please refer to the
-    documentation of the configured scheduler to see how this property is used.
-  </description>
-</property>
-
 <property>
   <name>mapred.task.limit.maxvmem</name>
   <value>-1</value>
@@ -271,23 +235,6 @@
   </description>
 </property>
 
-<property>
-  <name>mapred.task.maxpmem</name>name>
-  <value>-1</value>
-  <description>
-   The maximum amount of physical memory any task of a job will use in bytes.
-
-   This value may be used by schedulers that support scheduling based on job's
-   memory requirements. In general, a task of this job will be scheduled on a
-   TaskTracker, only if the amount of physical memory still unoccupied on the
-   TaskTracker is greater than or equal to this value. Different schedulers can
-   take different decisions, some might just ignore this value. Please refer to
-   the documentation of the scheduler being configured to see if it does
-   memory based scheduling and if it does, how this variable is used by that
-   scheduler.
-  </description>
-</property>
-
 <property>
   <name>mapred.tasktracker.memory_calculator_plugin</name>
   <value></value>

+ 28 - 145
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -123,111 +123,11 @@ public class JobConf extends Configuration {
    */
   public static final String DEFAULT_QUEUE_NAME = "default";
 
-  /**
-   * Cluster-wide configuration to be set by the administrators that provides
-   * default amount of maximum virtual memory for job's tasks. This has to be
-   * set on both the JobTracker node for the sake of scheduling decisions and on
-   * the TaskTracker nodes for the sake of memory management.
-   * 
-   * <p>
-   * 
-   * If a job doesn't specify its virtual memory requirement by setting
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
-   * tasks are assured a memory limit set to this property. This property is
-   * disabled by default, and if not explicitly set to a valid value by the
-   * administrators and if a job doesn't specify its virtual memory
-   * requirements, the job's tasks will not be assured anything and may be
-   * killed by a TT that intends to control the total memory usage of the tasks
-   * via memory management functionality.
-   * 
-   * <p>
-   * 
-   * This value should in general be less than the cluster-wide configuration
-   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
-   * TaskTracker's memory management may be disabled and a scheduler's memory
-   * based scheduling decisions will be affected. Please refer to the
-   * documentation of the configured scheduler to see how this property is used.
-   */
-  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
-      "mapred.task.default.maxvmem";
+  static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
+      "mapred.job.map.memory.mb";
 
-  /**
-   * The maximum amount of memory any task of this job will use.
-   * 
-   * <p>
-   * 
-   * This value will be used by TaskTrackers for monitoring the memory usage of
-   * tasks of this jobs. If a TaskTracker's memory management functionality is
-   * enabled, each task of this job will be allowed to use a maximum virtual
-   * memory specified by this property. If the task's memory usage goes over
-   * this value, the task will be failed by the TT. If not set, the cluster-wide
-   * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
-   * default value for memory requirements. If this property cascaded with
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
-   * tasks will not be assured anything and may be killed by a TT that intends
-   * to control the total memory usage of the tasks via memory management
-   * functionality. If the memory management functionality is disabled on a TT,
-   * this value is ignored.
-   * 
-   * <p>
-   * 
-   * This value should also be not more than the cluster-wide configuration
-   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
-   * administrators.
-   * 
-   * <p>
-   * 
-   * This value may be used by schedulers that support scheduling based on job's
-   * memory requirements. In general, a task of this job will be scheduled on a
-   * TaskTracker only if the amount of virtual memory still unoccupied on the
-   * TaskTracker is greater than or equal to this value. But different
-   * schedulers can take different decisions. Please refer to the documentation
-   * of the scheduler being configured to see if it does memory based scheduling
-   * and if it does, how this property is used by that scheduler.
-   * 
-   * @see #setMaxVirtualMemoryForTask(long)
-   * @see #getMaxVirtualMemoryForTask()
-   */
-  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
-      "mapred.task.maxvmem";
-
-  /**
-   * The maximum amount of physical memory any task of a job will use.
-   * 
-   * <p>
-   * 
-   * This value may be used by schedulers that support scheduling based on job's
-   * memory requirements. In general, a task of this job will be scheduled on a
-   * TaskTracker, only if the amount of physical memory still unoccupied on the
-   * TaskTracker is greater than or equal to this value. But different
-   * schedulers can take different decisions. Please refer to the documentation
-   * of the scheduler being configured to see how it does memory based
-   * scheduling and how this variable is used by that scheduler.
-   * 
-   * @see #setMaxPhysicalMemoryForTask(long)
-   * @see #getMaxPhysicalMemoryForTask()
-   */
-  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
-      "mapred.task.maxpmem";
-
-  /**
-   * Cluster-wide configuration to be set by the site administrators that
-   * provides an upper limit on the maximum virtual memory that can be specified
-   * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
-   * the cluster-wide configuration
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
-   * less than this value. If the job configuration
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
-   * depending on the scheduler being configured, the job may be rejected or the
-   * job configuration may just be ignored.
-   * 
-   * <p>
-   * 
-   * If it is not set on a TaskTracker, TaskTracker's memory management will be
-   * disabled.
-   */
-  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
-      "mapred.task.limit.maxvmem";
+  static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.job.reduce.memory.mb";
 
   /**
    * Construct a map/reduce job configuration.
@@ -1491,53 +1391,23 @@ public class JobConf extends Configuration {
   public String getJobLocalDir() {
     return get("job.local.dir");
   }
-  
-  /**
-   * The maximum amount of memory any task of this job will use. See
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
-   * 
-   * @return The maximum amount of memory any task of this job will use, in
-   *         bytes.
-   * @see #setMaxVirtualMemoryForTask(long)
-   */
-  public long getMaxVirtualMemoryForTask() {
-    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+
+  long getMemoryForMapTask() {
+    return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+        DISABLED_MEMORY_LIMIT);
   }
 
-  /**
-   * Set the maximum amount of memory any task of this job can use. See
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
-   * 
-   * @param vmem Maximum amount of virtual memory in bytes any task of this job
-   *          can use.
-   * @see #getMaxVirtualMemoryForTask()
-   */
-  public void setMaxVirtualMemoryForTask(long vmem) {
-    setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
+  void setMemoryForMapTask(long mem) {
+    setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
   }
 
-  /**
-   * The maximum amount of physical memory any task of this job will use. See
-   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
-   * 
-   * @return The maximum amount of physical memory any task of this job will
-   *         use, in bytes.
-   * @see #setMaxPhysicalMemoryForTask(long)
-   */
-  public long getMaxPhysicalMemoryForTask() {
-    return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+  long getMemoryForReduceTask() {
+    return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+        DISABLED_MEMORY_LIMIT);
   }
 
-  /**
-   * Set the maximum amount of physical memory any task of this job can use. See
-   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
-   * 
-   * @param pmem Maximum amount of physical memory in bytes any task of this job
-   *          can use.
-   * @see #getMaxPhysicalMemoryForTask()
-   */
-  public void setMaxPhysicalMemoryForTask(long pmem) {
-    setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
+  void setMemoryForReduceTask(long mem) {
+    setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
   }
 
   /**
@@ -1559,6 +1429,19 @@ public class JobConf extends Configuration {
     set("mapred.job.queue.name", queueName);
   }
   
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized value
+   */
+  public static long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing

+ 0 - 21
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -175,8 +175,6 @@ class JobInProgress {
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
-  private long maxVirtualMemoryForTask;
-  private long maxPhysicalMemoryForTask;
   
   // Per-job counters
   public static enum Counter { 
@@ -283,8 +281,6 @@ class JobInProgress {
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
-    setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
-    setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
   }
 
   /**
@@ -568,23 +564,6 @@ class JobInProgress {
     JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
 
-  // Accessors for resources.
-  long getMaxVirtualMemoryForTask() {
-    return maxVirtualMemoryForTask;
-  }
-
-  void setMaxVirtualMemoryForTask(long maxVMem) {
-    maxVirtualMemoryForTask = maxVMem;
-  }
-
-  long getMaxPhysicalMemoryForTask() {
-    return maxPhysicalMemoryForTask;
-  }
-
-  void setMaxPhysicalMemoryForTask(long maxPMem) {
-    maxPhysicalMemoryForTask = maxPMem;
-  }
-
   // Update the job start/launch time (upon restart) and log to history
   synchronized void updateJobInfo(long startTime, long launchTime) {
     // log and change to the job's start/launch time

+ 103 - 0
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1482,6 +1482,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   Path systemDir = null;
   private JobConf conf;
 
+  long limitMaxMemForMapTasks;
+  long limitMaxMemForReduceTasks;
+  long memSizeForMapSlotOnJT;
+  long memSizeForReduceSlotOnJT;
+
   private QueueManager queueManager;
 
   /**
@@ -1510,6 +1515,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     this.conf = conf;
     JobConf jobConf = new JobConf(conf);
 
+    initializeTaskMemoryRelatedConfig();
+
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
@@ -2940,6 +2947,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       throw ioe;
     }
 
+    // Check the job if it cannot run in the cluster because of invalid memory
+    // requirements.
+    try {
+      checkMemoryRequirements(job);
+    } catch (IOException ioe) {
+      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
+      throw ioe;
+    }
+
    return addJob(jobId, job); 
   }
 
@@ -3199,6 +3215,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   }
   
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+
+  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+      "mapred.cluster.map.memory.mb";
+  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.cluster.reduce.memory.mb";
+
+  static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =
+      "mapred.cluster.max.map.memory.mb";
+  static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.cluster.max.reduce.memory.mb";
   
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
@@ -3595,4 +3621,81 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     SecurityUtil.getPolicy().refresh();
   }
+
+  private void initializeTaskMemoryRelatedConfig() {
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append(
+        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+            limitMaxMemForReduceTasks).append(")"));
+  }
+
+  private boolean perTaskMemoryConfigurationSetOnJT() {
+    if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
+        || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT
+        || memSizeForMapSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT
+        || memSizeForReduceSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Check the job if it has invalid requirements and throw and IOException if does have.
+   * 
+   * @param job
+   * @throws IOException 
+   */
+  private void checkMemoryRequirements(JobInProgress job)
+      throws IOException {
+    if (!perTaskMemoryConfigurationSetOnJT()) {
+      LOG.debug("Per-Task memory configuration is not set on JT. "
+          + "Not checking the job for invalid memory requirements.");
+      return;
+    }
+
+    boolean invalidJob = false;
+    String msg = "";
+    long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
+    long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+
+    if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
+        || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      invalidJob = true;
+      msg = "Invalid job requirements.";
+    }
+
+    if (maxMemForMapTask > limitMaxMemForMapTasks
+        || maxMemForReduceTask > limitMaxMemForReduceTasks) {
+      invalidJob = true;
+      msg = "Exceeds the cluster's max-memory-limit.";
+    }
+
+    if (invalidJob) {
+      StringBuilder jobStr =
+          new StringBuilder().append(job.getJobID().toString()).append("(")
+              .append(maxMemForMapTask).append(" memForMapTasks ").append(
+                  maxMemForReduceTask).append(" memForReduceTasks): ");
+      LOG.warn(jobStr.toString() + msg);
+
+      throw new IOException(jobStr.toString() + msg);
+    }
+  }
 }

+ 5 - 18
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java

@@ -59,8 +59,7 @@ class TaskMemoryManagerThread extends Thread {
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
     maxMemoryAllowedForAllTasks =
-        taskTracker.getTotalVirtualMemoryOnTT()
-            - taskTracker.getReservedVirtualMemory();
+        taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L;
 
     monitoringInterval = taskTracker.getJobConf().getLong(
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
@@ -205,17 +204,6 @@ class TaskMemoryManagerThread extends Thread {
           LOG.info("Memory usage of ProcessTree " + pId + " :"
               + currentMemUsage + "bytes. Limit : " + limit + "bytes");
 
-          if (limit > taskTracker.getLimitMaxVMemPerTask()) {
-            // TODO: With monitoring enabled and no scheduling based on
-            // memory,users can seriously hijack the system by specifying memory
-            // requirements well above the cluster wide limit. Ideally these
-            // jobs
-            // should have been rejected by JT/scheduler. Because we can't do
-            // that, in the minimum we should fail the tasks and hence the job.
-            LOG.warn("Task " + tid
-                + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
-          }
-
           if (limit != JobConf.DISABLED_MEMORY_LIMIT
               && currentMemUsage > limit) {
             // Task (the root process) is still alive and overflowing memory.
@@ -245,12 +233,11 @@ class TaskMemoryManagerThread extends Thread {
         }
       }
 
-      LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
-          + "bytes. Total limit : " + maxMemoryAllowedForAllTasks);
-
       if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
-        LOG.warn("The total memory usage is still overflowing TTs limits."
-            + " Trying to kill a few tasks with the least progress.");
+        LOG.warn("The total memory in usage " + memoryStillInUsage
+            + " is still overflowing TTs limits "
+            + maxMemoryAllowedForAllTasks
+            + ". Trying to kill a few tasks with the least progress.");
         killTasksWithLeastProgress(memoryStillInUsage);
       }
     

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -373,7 +373,7 @@ abstract class TaskRunner extends Thread {
              taskid.toString(), t.isTaskCleanupTask())),
             this.conf).toString();
       t.setPidFile(pidFile);
-      tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
+      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);

+ 42 - 172
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -197,58 +197,10 @@ public class TaskTracker
   private TaskMemoryManagerThread taskMemoryManager;
   private boolean taskMemoryManagerEnabled = true;
   private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
-  private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
-  private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
-  private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
-
-  // Cluster wide default value for max-vm per task
-  private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-  // Cluster wide upper limit on max-vm per task
-  private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-
-  /**
-   * Configuration property to specify the amount of virtual memory that has to
-   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
-   * virtual memory should be a part of the total virtual memory available on
-   * the TaskTracker. TaskTracker obtains the total virtual memory available on
-   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
-   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
-   * MemoryCalculatorPlugin implementation.
-   * 
-   * <p>
-   * 
-   * The reserved virtual memory and the total virtual memory values are
-   * reported by the TaskTracker as part of heart-beat so that they can
-   * considered by a scheduler.
-   * 
-   * <p>
-   * 
-   * These two values are also used by the TaskTracker for tracking tasks'
-   * memory usage. Memory management functionality on a TaskTracker is disabled
-   * if this property is not set, if it more than the total virtual memory
-   * reported by MemoryCalculatorPlugin, or if either of the values is negative.
-   */
-  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
-      "mapred.tasktracker.vmem.reserved";
-
-  /**
-   * Configuration property to specify the amount of physical memory that has to
-   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
-   * physical memory should be a part of the total physical memory available on
-   * the TaskTracker. TaskTracker obtains the total physical memory available on
-   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
-   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
-   * MemoryCalculatorPlugin implementation.
-   * 
-   * <p>
-   * 
-   * The reserved virtual memory and the total virtual memory values are
-   * reported by the TaskTracker as part of heart-beat so that they can
-   * considered by a scheduler.
-   * 
-   */
-  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
-      "mapred.tasktracker.pmem.reserved";
+  private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
@@ -1232,14 +1184,14 @@ public class TaskTracker
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
-      long rsrvdVmem = getReservedVirtualMemory();
-      long rsrvdPmem = getReservedPhysicalMemory();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
-      status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
-      status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
+      status.getResourceStatus().setMapSlotMemorySizeOnTT(
+          mapSlotMemorySizeOnTT);
+      status.getResourceStatus().setReduceSlotMemorySizeOnTT(
+          reduceSlotSizeMemoryOnTT);
     }
       
     //
@@ -1302,53 +1254,11 @@ public class TaskTracker
    * @return total size of physical memory.
    */
   long getTotalPhysicalMemoryOnTT() {
-    return totalPmemOnTT;
+    return totalPhysicalMemoryOnTT;
   }
 
-  /**
-   * Return the amount of virtual memory reserved on the TaskTracker for system
-   * usage (OS, TT etc).
-   */
-  long getReservedVirtualMemory() {
-    return reservedVirtualMemory;
-  }
-
-  /**
-   * Return the amount of physical memory reserved on the TaskTracker for system
-   * usage (OS, TT etc).
-   */
-  long getReservedPhysicalMemory() {
-    return reservedPmem;
-  }
-
-  /**
-   * Return the limit on the maxVMemPerTask on this TaskTracker
-   * @return limitMaxVmPerTask
-   */
-  long getLimitMaxVMemPerTask() {
-    return limitMaxVmPerTask;
-  }
-
-  /**
-   * Obtain the virtual memory allocated for a TIP.
-   * 
-   * If the TIP's job has a configured value for the max-virtual memory, that
-   * will be returned. Else, the cluster-wide default maxvirtual memory for
-   * tasks is returned.
-   * 
-   * @param conf
-   * @return the virtual memory allocated for the TIP.
-   */
-  long getVirtualMemoryForTask(JobConf conf) {
-    long vMemForTask =
-        normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
-    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      vMemForTask =
-          normalizeMemoryConfigValue(fConf.getLong(
-              JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-              JobConf.DISABLED_MEMORY_LIMIT));
-    }
-    return vMemForTask;
+  long getTotalMemoryAllottedForTasksOnTT() {
+    return totalMemoryAllottedForTasks;
   }
 
   /**
@@ -1621,7 +1531,6 @@ public class TaskTracker
 
   private TaskLauncher mapLauncher;
   private TaskLauncher reduceLauncher;
-      
   public JvmManager getJvmManagerInstance() {
     return jvmManager;
   }
@@ -1759,12 +1668,14 @@ public class TaskTracker
     }
   }
   
-  void addToMemoryManager(TaskAttemptID attemptId, 
+  void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
                           JobConf conf, 
                           String pidFile) {
     if (isTaskMemoryManagerEnabled()) {
       taskMemoryManager.addTask(attemptId, 
-        getVirtualMemoryForTask(conf), pidFile);
+          isMap ? conf
+              .getMemoryForMapTask() * 1024 * 1024L : conf
+              .getMemoryForReduceTask() * 1024 * 1024L, pidFile);
     }
   }
 
@@ -3098,33 +3009,35 @@ public class TaskTracker
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
-      if (totalPmemOnTT <= 0) {
+      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
-        totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+        totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
     }
 
-    reservedVirtualMemory =
-        normalizeMemoryConfigValue(fConf.getLong(
-            TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    reservedPmem =
-        normalizeMemoryConfigValue(fConf.getLong(
-            TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    defaultMaxVmPerTask =
-        normalizeMemoryConfigValue(fConf.getLong(
-            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    limitMaxVmPerTask =
-        normalizeMemoryConfigValue(fConf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
+    mapSlotMemorySizeOnTT =
+        fConf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT);
+    reduceSlotSizeMemoryOnTT =
+        fConf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT);
+    totalMemoryAllottedForTasks =
+        maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+            * reduceSlotSizeMemoryOnTT;
+    if (totalMemoryAllottedForTasks < 0) {
+      totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) {
+      LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT."
+          + " Thrashing might happen.");
+    } else if (totalMemoryAllottedForTasks > totalVirtualMemoryOnTT) {
+      LOG.info("totalMemoryAllottedForTasks > totalVirtualMemoryOnTT."
+          + " Thrashing might happen.");
+    }
 
     // start the taskMemoryManager thread only if enabled
     setTaskMemoryManagerEnabledFlag();
@@ -3143,55 +3056,12 @@ public class TaskTracker
       return;
     }
 
-    // /// Missing configuration
-    StringBuilder mesg = new StringBuilder();
-
-    long totalVmemOnTT = getTotalVirtualMemoryOnTT();
-    if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's totalVmem could not be calculated.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    long reservedVmem = getReservedVirtualMemory();
-    if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's reservedVmem is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n");
+    if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
       taskMemoryManagerEnabled = false;
-    }
-
-    if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (!taskMemoryManagerEnabled) {
-      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
-      return;
-    }
-    // ///// End of missing configuration
-
-    // ///// Mis-configuration
-    if (defaultMaxVmPerTask > limitMaxVmPerTask) {
-      mesg.append("defaultMaxVmPerTask is mis-configured. "
-          + "It shouldn't be greater than limitMaxVmPerTask. ");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (reservedVmem > totalVmemOnTT) {
-      mesg.append("reservedVmemOnTT is mis-configured. "
-          + "It shouldn't be greater than totalVmemOnTT");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (!taskMemoryManagerEnabled) {
-      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
+      LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1."
+          + " TaskMemoryManager is disabled.");
       return;
     }
-    // ///// End of mis-configuration
 
     taskMemoryManagerEnabled = true;
   }

+ 43 - 35
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -55,16 +55,16 @@ class TaskTrackerStatus implements Writable {
   static class ResourceStatus implements Writable {
     
     private long totalVirtualMemory;
-    private long reservedVirtualMemory;
     private long totalPhysicalMemory;
-    private long reservedPhysicalMemory;
+    private long mapSlotMemorySizeOnTT;
+    private long reduceSlotMemorySizeOnTT;
     private long availableSpace;
     
     ResourceStatus() {
       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
-      reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
-      reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      reduceSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       availableSpace = Long.MAX_VALUE;
     }
 
@@ -89,24 +89,6 @@ class TaskTrackerStatus implements Writable {
       return totalVirtualMemory;
     }
 
-    /**
-     * Set the amount of virtual memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
-     * 
-     * @param reservedVmem amount of virtual memory reserved in bytes.
-     */
-    void setReservedVirtualMemory(long reservedVmem) {
-      reservedVirtualMemory = reservedVmem;
-    }
-
-    /**
-     * Get the amount of virtual memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
-     */
-    long getReservedTotalMemory() {
-      return reservedVirtualMemory;
-    }
-
     /**
      * Set the maximum amount of physical memory on the tasktracker.
      * 
@@ -130,23 +112,49 @@ class TaskTrackerStatus implements Writable {
     }
 
     /**
-     * Set the amount of physical memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
+     * Set the memory size of each map slot on this TT. This will be used by JT
+     * for accounting more slots for jobs that use more memory.
+     * 
+     * @param mem
+     */
+    void setMapSlotMemorySizeOnTT(long mem) {
+      mapSlotMemorySizeOnTT = mem;
+    }
+
+    /**
+     * Get the memory size of each map slot on this TT. See
+     * {@link #setMapSlotMemorySizeOnTT(long)}
      * 
-     * @param reservedPmem amount of physical memory reserved in bytes.
+     * @return
      */
-    void setReservedPhysicalMemory(long reservedPmem) {
-      reservedPhysicalMemory = reservedPmem;
+    long getMapSlotMemorySizeOnTT() {
+      return mapSlotMemorySizeOnTT;
     }
 
     /**
-     * Get the amount of physical memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
+     * Set the memory size of each reduce slot on this TT. This will be used by
+     * JT for accounting more slots for jobs that use more memory.
+     * 
+     * @param mem
      */
-    long getReservedPhysicalMemory() {
-      return reservedPhysicalMemory;
+    void setReduceSlotMemorySizeOnTT(long mem) {
+      reduceSlotMemorySizeOnTT = mem;
     }
 
+    /**
+     * Get the memory size of each reduce slot on this TT. See
+     * {@link #setReduceSlotMemorySizeOnTT(long)}
+     * 
+     * @return
+     */
+    long getReduceSlotMemorySizeOnTT() {
+      return reduceSlotMemorySizeOnTT;
+    }
+
+    /**
+     * Set the available disk space on the TT
+     * @param availSpace
+     */
     void setAvailableSpace(long availSpace) {
       availableSpace = availSpace;
     }
@@ -161,17 +169,17 @@ class TaskTrackerStatus implements Writable {
     
     public void write(DataOutput out) throws IOException {
       WritableUtils.writeVLong(out, totalVirtualMemory);
-      WritableUtils.writeVLong(out, reservedVirtualMemory);
       WritableUtils.writeVLong(out, totalPhysicalMemory);
-      WritableUtils.writeVLong(out, reservedPhysicalMemory);
+      WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
+      WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, availableSpace);
     }
     
     public void readFields(DataInput in) throws IOException {
       totalVirtualMemory = WritableUtils.readVLong(in);
-      reservedVirtualMemory = WritableUtils.readVLong(in);
       totalPhysicalMemory = WritableUtils.readVLong(in);
-      reservedPhysicalMemory = WritableUtils.readVLong(in);
+      mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
+      reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       availableSpace = WritableUtils.readVLong(in);
     }
   }

+ 111 - 0
src/test/org/apache/hadoop/mapred/TestSubmitJob.java

@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ToolRunner;
+
+import junit.framework.TestCase;
+
+public class TestSubmitJob extends TestCase {
+  private MiniMRCluster miniMRCluster;
+
+  @Override
+  protected void tearDown()
+      throws Exception {
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
+  }
+
+  /**
+   * Test to verify that jobs with invalid memory requirements are killed at the
+   * JT.
+   * 
+   * @throws Exception
+   */
+  public void testJobWithInvalidMemoryReqs()
+      throws Exception {
+    JobConf jtConf = new JobConf();
+    jtConf
+        .setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+    jtConf.setLong(JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        4 * 1024L);
+
+    miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+
+    JobConf clusterConf = miniMRCluster.createJobConf();
+
+    // No map-memory configuration
+    JobConf jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForReduceTask(1 * 1024L);
+    runJobAndVerifyFailure(jobConf, JobConf.DISABLED_MEMORY_LIMIT, 1 * 1024L,
+        "Invalid job requirements.");
+
+    // No reduce-memory configuration
+    jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForMapTask(1 * 1024L);
+    runJobAndVerifyFailure(jobConf, 1 * 1024L, JobConf.DISABLED_MEMORY_LIMIT,
+        "Invalid job requirements.");
+
+    // Invalid map-memory configuration
+    jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForMapTask(4 * 1024L);
+    jobConf.setMemoryForReduceTask(1 * 1024L);
+    runJobAndVerifyFailure(jobConf, 4 * 1024L, 1 * 1024L,
+        "Exceeds the cluster's max-memory-limit.");
+
+    // No reduce-memory configuration
+    jobConf = new JobConf(clusterConf);
+    jobConf.setMemoryForMapTask(1 * 1024L);
+    jobConf.setMemoryForReduceTask(5 * 1024L);
+    runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
+        "Exceeds the cluster's max-memory-limit.");
+  }
+
+  private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
+      long memForReduceTasks, String expectedMsg)
+      throws Exception,
+      IOException {
+    String[] args = { "-m", "0", "-r", "0", "-mt", "0", "-rt", "0" };
+    boolean throwsException = false;
+    String msg = null;
+    try {
+      ToolRunner.run(jobConf, new SleepJob(), args);
+    } catch (RemoteException re) {
+      throwsException = true;
+      msg = re.unwrapRemoteException().getMessage();
+    }
+    assertTrue(throwsException);
+    assertNotNull(msg);
+
+    String overallExpectedMsg =
+        "(" + memForMapTasks + " memForMapTasks " + memForReduceTasks
+            + " memForReduceTasks): " + expectedMsg;
+    assertTrue("Observed message - " + msg
+        + " - doesn't contain expected message - " + overallExpectedMsg, msg
+        .contains(overallExpectedMsg));
+  }
+}

+ 44 - 57
src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java

@@ -22,10 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ToolRunner;
@@ -46,7 +43,6 @@ public class TestTTMemoryReporting extends TestCase {
 
   static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
   
-  private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
 
   /**
@@ -77,41 +73,42 @@ public class TestTTMemoryReporting extends TestCase {
           getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
       long totalPhysicalMemoryOnTT =
           getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long virtualMemoryReservedOnTT =
-          getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long physicalMemoryReservedOnTT =
-          getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
+      long mapSlotMemorySize =
+          getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
+      long reduceSlotMemorySize =
+          getConf()
+              .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
 
       long reportedTotalVirtualMemoryOnTT =
           status.getResourceStatus().getTotalVirtualMemory();
       long reportedTotalPhysicalMemoryOnTT =
           status.getResourceStatus().getTotalPhysicalMemory();
-      long reportedVirtualMemoryReservedOnTT =
-          status.getResourceStatus().getReservedTotalMemory();
-      long reportedPhysicalMemoryReservedOnTT =
-          status.getResourceStatus().getReservedPhysicalMemory();
+      long reportedMapSlotMemorySize =
+          status.getResourceStatus().getMapSlotMemorySizeOnTT();
+      long reportedReduceSlotMemorySize =
+          status.getResourceStatus().getReduceSlotMemorySizeOnTT();
 
       message =
           "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
-              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
-              + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
-              + ")";
+              + "mapSlotMemSize, reduceSlotMemorySize) = ("
+              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ","
+              + mapSlotMemorySize + "," + reduceSlotMemorySize + ")";
       message +=
           "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
+              + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = ("
               + reportedTotalVirtualMemoryOnTT
               + ", "
               + reportedTotalPhysicalMemoryOnTT
-              + ", "
-              + reportedVirtualMemoryReservedOnTT
-              + ", "
-              + reportedPhysicalMemoryReservedOnTT + ")";
+              + ","
+              + reportedMapSlotMemorySize
+              + ","
+              + reportedReduceSlotMemorySize
+              + ")";
       LOG.info(message);
       if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
           || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
-          || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
-          || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
+          || mapSlotMemorySize != reportedMapSlotMemorySize
+          || reduceSlotMemorySize != reportedReduceSlotMemorySize) {
         hasPassed = false;
       }
       return super.assignTasks(status);
@@ -132,7 +129,7 @@ public class TestTTMemoryReporting extends TestCase {
           TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
           DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
       setUpCluster(conf);
-      runSleepJob();
+      runSleepJob(miniMRCluster.createJobConf());
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -149,8 +146,9 @@ public class TestTTMemoryReporting extends TestCase {
     JobConf conf = new JobConf();
     conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
     conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
+    conf.setLong("mapSlotMemorySize", 1 * 512L);
+    conf.setLong("reduceSlotMemorySize", 1 * 1024L);
+
     conf.setClass(
         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
@@ -158,15 +156,17 @@ public class TestTTMemoryReporting extends TestCase {
         4 * 1024 * 1024 * 1024L);
     conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
         2 * 1024 * 1024 * 1024L);
+    conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        512L);
     conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-        512 * 1024 * 1024L);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
+    
     try {
       setUpCluster(conf);
-      runSleepJob();
+      JobConf jobConf = miniMRCluster.createJobConf();
+      jobConf.setMemoryForMapTask(1 * 1024L);
+      jobConf.setMemoryForReduceTask(2 * 1024L);
+      runSleepJob(jobConf);
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -189,17 +189,10 @@ public class TestTTMemoryReporting extends TestCase {
     LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
     conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
     conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
-    conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
-    conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    conf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-        512 * 1024 * 1024L);
+
     try {
       setUpCluster(conf);
-      runSleepJob();
+      runSleepJob(miniMRCluster.createJobConf());
       verifyTestResults();
     } finally {
       tearDownCluster();
@@ -208,22 +201,15 @@ public class TestTTMemoryReporting extends TestCase {
 
   private void setUpCluster(JobConf conf)
                                 throws Exception {
-    conf.setClass("mapred.jobtracker.taskScheduler", 
-        TestTTMemoryReporting.FakeTaskScheduler.class,
-        TaskScheduler.class);
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
-                      null, null, conf);    
+    conf.setClass("mapred.jobtracker.taskScheduler",
+        TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
+    conf.set("mapred.job.tracker.handler.count", "1");
+    miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
   }
   
-  private void runSleepJob() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set("mapred.job.tracker", "localhost:"
-                              + miniMRCluster.getJobTrackerPort());
+  private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "1",
-                      "-mt", "1000", "-rt", "1000" };
+                      "-mt", "10", "-rt", "10" };
     ToolRunner.run(conf, new SleepJob(), args);
   }
 
@@ -235,7 +221,8 @@ public class TestTTMemoryReporting extends TestCase {
   }
   
   private void tearDownCluster() {
-    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
-    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
   }
-}
+}

+ 67 - 131
src/test/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
@@ -42,18 +45,19 @@ public class TestTaskTrackerMemoryManager extends TestCase {
 
   private static final Log LOG =
       LogFactory.getLog(TestTaskTrackerMemoryManager.class);
-  private MiniDFSCluster miniDFSCluster;
   private MiniMRCluster miniMRCluster;
 
   private String taskOverLimitPatternString =
       "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
           + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
 
-  private void startCluster(JobConf conf) throws Exception {
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
+  private void startCluster(JobConf conf)
+      throws Exception {
+    conf.set("mapred.job.tracker.handler.count", "1");
+    conf.set("mapred.tasktracker.map.tasks.maximum", "1");
+    conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+    conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
+    miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
   }
 
   @Override
@@ -61,9 +65,6 @@ public class TestTaskTrackerMemoryManager extends TestCase {
     if (miniMRCluster != null) {
       miniMRCluster.shutdown();
     }
-    if (miniDFSCluster != null) {
-      miniDFSCluster.shutdown();
-    }
   }
 
   private void runSleepJob(JobConf conf) throws Exception {
@@ -73,15 +74,6 @@ public class TestTaskTrackerMemoryManager extends TestCase {
 
   private void runAndCheckSuccessfulJob(JobConf conf)
       throws IOException {
-    // Set up job.
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
-
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
     Matcher mat = null;
@@ -148,43 +140,12 @@ public class TestTaskTrackerMemoryManager extends TestCase {
       return;
     }
 
-    JobConf conf = new JobConf();
     // Task-memory management disabled by default.
-    startCluster(conf);
-    long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    runAndCheckSuccessfulJob(conf);
-  }
-
-  /**
-   * Test for verifying that tasks with no limits, with the cumulative usage
-   * still under TT's limits, succeed.
-   * 
-   * @throws Exception
-   */
-  public void testTasksWithNoLimits()
-      throws Exception {
-    // Run the test only if memory management is enabled
-    if (!isProcfsBasedTreeAvailable()) {
-      return;
-    }
-
-    // Fairly large value for sleepJob to succeed
-    long ttLimit = 4 * 1024 * 1024 * 1024L;
-    // Start cluster with proper configuration.
-    JobConf fConf = new JobConf();
-
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        ttLimit);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
-    fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
-    startCluster(fConf);
-    JobConf conf = new JobConf();
+    startCluster(new JobConf());
+    long PER_TASK_LIMIT = 1L; // Doesn't matter how low.
+    JobConf conf = miniMRCluster.createJobConf();
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
     runAndCheckSuccessfulJob(conf);
   }
 
@@ -202,33 +163,25 @@ public class TestTaskTrackerMemoryManager extends TestCase {
     }
 
     // Large so that sleepjob goes through and fits total TT usage
-    long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
-    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
+    long PER_TASK_LIMIT = 2 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024L);
     fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
-    startCluster(fConf);
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    runAndCheckSuccessfulJob(conf);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    startCluster(new JobConf());
 
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    runAndCheckSuccessfulJob(conf);
   }
 
   /**
-   * Test for verifying that tasks that go beyond limits, though the cumulative
-   * usage is under TT's limits, get killed.
+   * Test for verifying that tasks that go beyond limits get killed.
    * 
    * @throws Exception
    */
@@ -240,43 +193,32 @@ public class TestTaskTrackerMemoryManager extends TestCase {
       return;
     }
 
-    long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
-    long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
-    // total usage
+    long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks.
+
     Pattern taskOverLimitPattern =
         Pattern.compile(String.format(taskOverLimitPatternString, String
-            .valueOf(PER_TASK_LIMIT)));
+            .valueOf(PER_TASK_LIMIT*1024*1024L)));
     Matcher mat = null;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
 
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    fConf.setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024);
     startCluster(fConf);
 
     // Set up job.
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
+    conf.setMaxMapAttempts(1);
+    conf.setMaxReduceAttempts(1);
 
     // Start the job.
     boolean success = true;
@@ -335,48 +277,39 @@ public class TestTaskTrackerMemoryManager extends TestCase {
     }
 
     // Large enough for SleepJob Tasks.
-    long PER_TASK_LIMIT = 100000000000L;
-    // Very Limited TT. All tasks will be killed.
-    long TASK_TRACKER_LIMIT = 100L;
-    Pattern taskOverLimitPattern =
-        Pattern.compile(String.format(taskOverLimitPatternString, String
-            .valueOf(PER_TASK_LIMIT)));
-    Pattern trackerOverLimitPattern =
-        Pattern
-            .compile("Killing one of the least progress tasks - .*, as "
-                + "the cumulative memory usage of all the tasks on the TaskTracker"
-                + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
-    Matcher mat = null;
+    long PER_TASK_LIMIT = 100 * 1024L;
 
     // Start cluster with proper configuration.
     JobConf fConf = new JobConf();
-    fConf.setClass(
-        TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
-    fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        TASK_TRACKER_LIMIT);
+    fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+        1L);
     fConf.setLong(
-        TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L);
+
+    // Because of the above, the total tt limit is 2mb
+    long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L;
+
     // very small value, so that no task escapes to successful completion.
     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
         String.valueOf(300));
 
     startCluster(fConf);
 
+    Pattern taskOverLimitPattern =
+      Pattern.compile(String.format(taskOverLimitPatternString, String
+          .valueOf(PER_TASK_LIMIT)));
+
+    Pattern trackerOverLimitPattern =
+      Pattern
+          .compile("Killing one of the least progress tasks - .*, as "
+              + "the cumulative memory usage of all the tasks on the TaskTracker"
+              + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
+    Matcher mat = null;
+
     // Set up job.
-    JobConf conf = new JobConf();
-    conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
-    JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
-    conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
-        + jt.getTrackerPort());
-    NameNode nn = miniDFSCluster.getNameNode();
-    conf.set("fs.default.name", "hdfs://"
-        + nn.getNameNodeAddress().getHostName() + ":"
-        + nn.getNameNodeAddress().getPort());
+    JobConf conf = new JobConf(miniMRCluster.createJobConf());
+    conf.setMemoryForMapTask(PER_TASK_LIMIT);
+    conf.setMemoryForReduceTask(PER_TASK_LIMIT);
 
     JobClient jClient = new JobClient(conf);
     SleepJob sleepJob = new SleepJob();
@@ -386,9 +319,12 @@ public class TestTaskTrackerMemoryManager extends TestCase {
         jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
     boolean TTOverFlowMsgPresent = false;
     while (true) {
-      // Set-up tasks are the first to be launched.
-      TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
-      for (TaskReport tr : setUpReports) {
+      List<TaskReport> allTaskReports = new ArrayList<TaskReport>();
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      allTaskReports.addAll(Arrays.asList(jClient
+          .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID())));
+      for (TaskReport tr : allTaskReports) {
         String[] diag = tr.getDiagnostics();
         for (String str : diag) {
           mat = taskOverLimitPattern.matcher(str);