Browse Source

MAPREDUCE-2107. [Gridmix] Total heap usage emulation in Gridmix. (Amar Kamat and Ravi Gummadi via amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1136000 13f79535-47bb-0310-9956-ffa450edef68
Amar Kamat 14 years ago
parent
commit
a732ab3804

+ 3 - 0
mapreduce/CHANGES.txt

@@ -11,6 +11,9 @@ Trunk (unreleased changes)
 
 
   NEW FEATURES
   NEW FEATURES
 
 
+    MAPREDUCE-2107. [Gridmix] Total heap usage emulation in Gridmix.
+    (Amar Kamat and Ravi Gummadi via amarrk)
+
     MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix. 
     MAPREDUCE-2106. [Gridmix] Cumulative CPU usage emulation in Gridmix. 
     (amarrk)
     (amarrk)
 
 

+ 89 - 0
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java

@@ -19,12 +19,15 @@ package org.apache.hadoop.mapred.gridmix;
 
 
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Formatter;
 import java.util.Formatter;
 import java.util.List;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.security.PrivilegedExceptionAction;
 import java.security.PrivilegedExceptionAction;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -86,6 +89,11 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
   // configuration key to enable/disable High-Ram feature emulation
   // configuration key to enable/disable High-Ram feature emulation
   static final String GRIDMIX_HIGHRAM_EMULATION_ENABLE = 
   static final String GRIDMIX_HIGHRAM_EMULATION_ENABLE = 
     "gridmix.highram-emulation.enable";
     "gridmix.highram-emulation.enable";
+  // configuration key to enable/disable task jvm options
+  static final String GRIDMIX_TASK_JVM_OPTIONS_ENABLE = 
+    "gridmix.task.jvm-options.enable";
+  private static final Pattern maxHeapPattern = 
+    Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
 
 
   private static void setJobQueue(Job job, String queue) {
   private static void setJobQueue(Job job, String queue) {
     if (queue != null) {
     if (queue != null) {
@@ -137,6 +145,19 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
                                        ret.getConfiguration());
                                        ret.getConfiguration());
           }
           }
           
           
+          // configure task jvm options if enabled
+          // this knob can be turned off if there is a mismatch between the
+          // target (simulation) cluster and the original cluster. Such a 
+          // mismatch can result in job failures (due to memory issues) on the 
+          // target (simulated) cluster.
+          //
+          // TODO If configured, scale the original task's JVM (heap related)
+          //      options to suit the target (simulation) cluster
+          if (conf.getBoolean(GRIDMIX_TASK_JVM_OPTIONS_ENABLE, true)) {
+            configureTaskJVMOptions(jobdesc.getJobConf(), 
+                                    ret.getConfiguration());
+          }
+          
           return ret;
           return ret;
         }
         }
       });
       });
@@ -148,6 +169,74 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
         submissionMillis, TimeUnit.MILLISECONDS);
         submissionMillis, TimeUnit.MILLISECONDS);
     outdir = new Path(outRoot, "" + seq);
     outdir = new Path(outRoot, "" + seq);
   }
   }
+  
+  @SuppressWarnings("deprecation")
+  protected static void configureTaskJVMOptions(Configuration originalJobConf,
+                                                Configuration simulatedJobConf){
+    // Get the heap related java opts used for the original job and set the 
+    // same for the simulated job.
+    //    set task task heap options
+    configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf, 
+                                   JobConf.MAPRED_TASK_JAVA_OPTS);
+    //  set map task heap options
+    configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf, 
+                                   MRJobConfig.MAP_JAVA_OPTS);
+
+    //  set reduce task heap options
+    configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf, 
+                                   MRJobConfig.REDUCE_JAVA_OPTS);
+  }
+  
+  // Configures the task's max heap options using the specified key
+  private static void configureTaskJVMMaxHeapOptions(Configuration srcConf, 
+                                                     Configuration destConf,
+                                                     String key) {
+    String srcHeapOpts = srcConf.get(key);
+    if (srcHeapOpts != null) {
+      List<String> srcMaxOptsList = new ArrayList<String>();
+      // extract the max heap options and ignore the rest
+      extractMaxHeapOpts(srcHeapOpts, srcMaxOptsList, 
+                         new ArrayList<String>());
+      if (srcMaxOptsList.size() > 0) {
+        List<String> destOtherOptsList = new ArrayList<String>();
+        // extract the other heap options and ignore the max options in the 
+        // destination configuration
+        String destHeapOpts = destConf.get(key);
+        if (destHeapOpts != null) {
+          extractMaxHeapOpts(destHeapOpts, new ArrayList<String>(), 
+                             destOtherOptsList);
+        }
+        
+        // the source configuration might have some task level max heap opts set
+        // remove these opts from the destination configuration and replace
+        // with the options set in the original configuration
+        StringBuilder newHeapOpts = new StringBuilder();
+        
+        for (String otherOpt : destOtherOptsList) {
+          newHeapOpts.append(otherOpt).append(" ");
+        }
+        
+        for (String opts : srcMaxOptsList) {
+          newHeapOpts.append(opts).append(" ");
+        }
+        
+        // set the final heap opts 
+        destConf.set(key, newHeapOpts.toString().trim());
+      }
+    }
+  }
+  
+  private static void extractMaxHeapOpts(String javaOptions,  
+      List<String> maxOpts,  List<String> others) {
+    for (String opt : javaOptions.split(" ")) {
+      Matcher matcher = maxHeapPattern.matcher(opt);
+      if (matcher.find()) {
+        maxOpts.add(opt);
+      } else {
+        others.add(opt);
+      }
+    }
+  }
 
 
   // Scales the desired job-level configuration parameter. This API makes sure 
   // Scales the desired job-level configuration parameter. This API makes sure 
   // that the ratio of the job level configuration parameter to the cluster 
   // that the ratio of the job level configuration parameter to the cluster 

+ 4 - 4
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/CumulativeCpuUsageEmulatorPlugin.java

@@ -34,7 +34,7 @@ import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
  * 
  * 
  * <p>{@link CumulativeCpuUsageEmulatorPlugin} emulates the CPU usage in steps. 
  * <p>{@link CumulativeCpuUsageEmulatorPlugin} emulates the CPU usage in steps. 
  * The frequency of emulation can be configured via 
  * The frequency of emulation can be configured via 
- * {@link #CPU_EMULATION_FREQUENCY}.
+ * {@link #CPU_EMULATION_PROGRESS_INTERVAL}.
  * CPU usage values are matched via emulation only on the interval boundaries.
  * CPU usage values are matched via emulation only on the interval boundaries.
  * </p>
  * </p>
  *  
  *  
@@ -70,8 +70,8 @@ implements ResourceUsageEmulatorPlugin {
   private long lastSeenCpuUsageCpuUsage = 0;
   private long lastSeenCpuUsageCpuUsage = 0;
   
   
   // Configuration parameters
   // Configuration parameters
-  public static final String CPU_EMULATION_FREQUENCY = 
-    "gridmix.emulators.resource-usage.cpu.frequency";
+  public static final String CPU_EMULATION_PROGRESS_INTERVAL = 
+    "gridmix.emulators.resource-usage.cpu.emulation-interval";
   private static final float DEFAULT_EMULATION_FREQUENCY = 0.1F; // 10 times
   private static final float DEFAULT_EMULATION_FREQUENCY = 0.1F; // 10 times
 
 
   /**
   /**
@@ -302,7 +302,7 @@ implements ResourceUsageEmulatorPlugin {
     
     
     this.monitor = monitor;
     this.monitor = monitor;
     this.progress = progress;
     this.progress = progress;
-    emulationInterval =  conf.getFloat(CPU_EMULATION_FREQUENCY, 
+    emulationInterval =  conf.getFloat(CPU_EMULATION_PROGRESS_INTERVAL, 
                                        DEFAULT_EMULATION_FREQUENCY);
                                        DEFAULT_EMULATION_FREQUENCY);
     
     
     // calibrate the core cpu-usage utility
     // calibrate the core cpu-usage utility

+ 258 - 0
mapreduce/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/resourceusage/TotalHeapUsageEmulatorPlugin.java

@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix.emulators.resourceusage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.Progressive;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+
+/**
+ * <p>A {@link ResourceUsageEmulatorPlugin} that emulates the total heap 
+ * usage by loading the JVM heap memory. Adding smaller chunks of data to the 
+ * heap will essentially use up some heap space thus forcing the JVM to expand 
+ * its heap and thus resulting into increase in the heap usage.</p>
+ * 
+ * <p>{@link TotalHeapUsageEmulatorPlugin} emulates the heap usage in steps. 
+ * The frequency of emulation can be configured via 
+ * {@link #HEAP_EMULATION_PROGRESS_INTERVAL}.
+ * Heap usage values are matched via emulation only at specific interval 
+ * boundaries.
+ * </p>
+ *  
+ * {@link TotalHeapUsageEmulatorPlugin} is a wrapper program for managing 
+ * the heap usage emulation feature. It internally uses an emulation algorithm 
+ * (called as core and described using {@link HeapUsageEmulatorCore}) for 
+ * performing the actual emulation. Multiple calls to this core engine should 
+ * use up some amount of heap.
+ */
+public class TotalHeapUsageEmulatorPlugin 
+implements ResourceUsageEmulatorPlugin {
+  // Configuration parameters
+  //  the core engine to emulate heap usage
+  protected HeapUsageEmulatorCore emulatorCore;
+  //  the progress bar
+  private Progressive progress;
+  //  decides if this plugin can emulate heap usage or not
+  private boolean enabled = true;
+  //  the progress boundaries/interval where emulation should be done
+  private float emulationInterval;
+  //  target heap usage to emulate
+  private long targetHeapUsageInMB = 0;
+  
+  /**
+   * The frequency (based on task progress) with which memory-emulation code is
+   * run. If the value is set to 0.1 then the emulation will happen at 10% of 
+   * the task's progress. The default value of this parameter is 
+   * {@link #DEFAULT_EMULATION_PROGRESS_INTERVAL}.
+   */
+  public static final String HEAP_EMULATION_PROGRESS_INTERVAL = 
+    "gridmix.emulators.resource-usage.heap.emulation-interval";
+  
+  // Default value for emulation interval
+  private static final float DEFAULT_EMULATION_PROGRESS_INTERVAL = 0.1F; // 10 %
+
+  private float prevEmulationProgress = 0F;
+  
+  /**
+   * The minimum buffer reserved for other non-emulation activities.
+   */
+  public static final String MIN_HEAP_FREE_RATIO = 
+    "gridmix.emulators.resource-usage.heap.min-free-ratio";
+  
+  private float minFreeHeapRatio;
+  
+  private static final float DEFAULT_MIN_FREE_HEAP_RATIO = 0.3F;
+  
+  /**
+   * Determines the unit increase per call to the core engine's load API. This
+   * is expressed as a percentage of the difference between the expected total 
+   * heap usage and the current usage. 
+   */
+  public static final String HEAP_LOAD_RATIO = 
+    "gridmix.emulators.resource-usage.heap.load-ratio";
+  
+  private float heapLoadRatio;
+  
+  private static final float DEFAULT_HEAP_LOAD_RATIO = 0.1F;
+  
+  public static int ONE_MB = 1024 * 1024;
+  
+  /**
+   * Defines the core heap usage emulation algorithm. This engine is expected
+   * to perform certain memory intensive operations to consume some
+   * amount of heap. {@link #load(long)} should load the current heap and 
+   * increase the heap usage by the specified value. This core engine can be 
+   * initialized using the {@link #initialize(ResourceCalculatorPlugin, long)} 
+   * API to suit the underlying hardware better.
+   */
+  public interface HeapUsageEmulatorCore {
+    /**
+     * Performs some memory intensive operations to use up some heap.
+     */
+    public void load(long sizeInMB);
+    
+    /**
+     * Initialize the core.
+     */
+    public void initialize(ResourceCalculatorPlugin monitor, 
+                           long totalHeapUsageInMB);
+    
+    /**
+     * Reset the resource usage
+     */
+    public void reset();
+  }
+  
+  /**
+   * This is the core engine to emulate the heap usage. The only responsibility 
+   * of this class is to perform certain memory intensive operations to make 
+   * sure that some desired value of heap is used.
+   */
+  public static class DefaultHeapUsageEmulator 
+  implements HeapUsageEmulatorCore {
+    // store the unit loads in a list
+    protected static ArrayList<Object> heapSpace = new ArrayList<Object>();
+    
+    /**
+     * Increase heap usage by current process by the given amount.
+     * This is done by creating objects each of size 1MB.
+     */
+    public void load(long sizeInMB) {
+      for (long i = 0; i < sizeInMB; ++i) {
+        // Create another String object of size 1MB
+        heapSpace.add((Object)new byte[ONE_MB]);
+      }
+    }
+    
+    /**
+     * This will initialize the core and check if the core can emulate the 
+     * desired target on the underlying hardware.
+     */
+    public void initialize(ResourceCalculatorPlugin monitor, 
+                           long totalHeapUsageInMB) {
+      long maxPhysicalMemoryInMB = monitor.getPhysicalMemorySize() / ONE_MB ;
+      if(maxPhysicalMemoryInMB < totalHeapUsageInMB) {
+        throw new RuntimeException("Total heap the can be used is " 
+            + maxPhysicalMemoryInMB 
+            + " bytes while the emulator is configured to emulate a total of " 
+            + totalHeapUsageInMB + " bytes");
+      }
+    }
+    
+    /**
+     * Clear references to all the GridMix-allocated special objects so that 
+     * heap usage is reduced.
+     */
+    @Override
+    public void reset() {
+      heapSpace.clear();
+    }
+  }
+  
+  public TotalHeapUsageEmulatorPlugin() {
+    this(new DefaultHeapUsageEmulator());
+  }
+  
+  /**
+   * For testing.
+   */
+  public TotalHeapUsageEmulatorPlugin(HeapUsageEmulatorCore core) {
+    emulatorCore = core;
+  }
+  
+  protected long getTotalHeapUsageInMB() {
+    return Runtime.getRuntime().totalMemory() / ONE_MB;
+  }
+  
+  protected long getMaxHeapUsageInMB() {
+    return Runtime.getRuntime().maxMemory() / ONE_MB;
+  }
+  
+  @Override
+  public void emulate() throws IOException, InterruptedException {
+    if (enabled) {
+      float currentProgress = progress.getProgress();
+      if (prevEmulationProgress < currentProgress 
+          && ((currentProgress - prevEmulationProgress) >= emulationInterval
+              || currentProgress == 1)) {
+
+        long maxHeapSizeInMB = getMaxHeapUsageInMB();
+        long committedHeapSizeInMB = getTotalHeapUsageInMB();
+        
+        // Increase committed heap usage, if needed
+        // Using a linear weighing function for computing the expected usage
+        long expectedHeapUsageInMB = 
+          Math.min(maxHeapSizeInMB,
+                   (long) (targetHeapUsageInMB * currentProgress));
+        if (expectedHeapUsageInMB < maxHeapSizeInMB
+            && committedHeapSizeInMB < expectedHeapUsageInMB) {
+          long bufferInMB = (long)(minFreeHeapRatio * expectedHeapUsageInMB);
+          long currentDifferenceInMB = 
+            expectedHeapUsageInMB - committedHeapSizeInMB;
+          long currentIncrementLoadSizeInMB = 
+                (long)(currentDifferenceInMB * heapLoadRatio);
+          // Make sure that at least 1 MB is incremented.
+          currentIncrementLoadSizeInMB = 
+            Math.max(1, currentIncrementLoadSizeInMB);
+          while (committedHeapSizeInMB + bufferInMB < expectedHeapUsageInMB) {
+            // add blocks in order of X% of the difference, X = 10% by default
+            emulatorCore.load(currentIncrementLoadSizeInMB);
+            committedHeapSizeInMB = getTotalHeapUsageInMB();
+          }
+        }
+        
+        // store the emulation progress boundary
+        prevEmulationProgress = currentProgress;
+      }
+      
+      // reset the core so that the garbage is reclaimed
+      emulatorCore.reset();
+    }
+  }
+
+  @Override
+  public void initialize(Configuration conf, ResourceUsageMetrics metrics,
+                         ResourceCalculatorPlugin monitor,
+                         Progressive progress) {
+    // get the target heap usage
+    targetHeapUsageInMB = metrics.getHeapUsage() / ONE_MB;
+    if (targetHeapUsageInMB <= 0 ) {
+      enabled = false;
+      return;
+    } else {
+      // calibrate the core heap-usage utility
+      emulatorCore.initialize(monitor, targetHeapUsageInMB);
+      enabled = true;
+    }
+    
+    this.progress = progress;
+    emulationInterval = 
+      conf.getFloat(HEAP_EMULATION_PROGRESS_INTERVAL, 
+                    DEFAULT_EMULATION_PROGRESS_INTERVAL);
+    
+    minFreeHeapRatio = conf.getFloat(MIN_HEAP_FREE_RATIO, 
+                                     DEFAULT_MIN_FREE_HEAP_RATIO);
+    
+    heapLoadRatio = conf.getFloat(HEAP_LOAD_RATIO, DEFAULT_HEAP_LOAD_RATIO);
+    
+    prevEmulationProgress = 0;
+  }
+}

+ 453 - 0
mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixMemoryEmulation.java

@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.DummyResourceCalculatorPlugin;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.DebugJobProducer.MockJob;
+import org.apache.hadoop.mapred.gridmix.TestHighRamJob.DummyGridmixJob;
+import org.apache.hadoop.mapred.gridmix.TestResourceUsageEmulators.FakeProgressive;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.TotalHeapUsageEmulatorPlugin.DefaultHeapUsageEmulator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
+
+/**
+ * Test Gridmix memory emulation.
+ */
+public class TestGridmixMemoryEmulation {
+  /**
+   * This is a dummy class that fakes heap usage.
+   */
+  private static class FakeHeapUsageEmulatorCore 
+  extends DefaultHeapUsageEmulator {
+    private int numCalls = 0;
+    
+    @Override
+    public void load(long sizeInMB) {
+      ++numCalls;
+      super.load(sizeInMB);
+    }
+    
+    // Get the total number of times load() was invoked
+    int getNumCalls() {
+      return numCalls;
+    }
+    
+    // Get the total number of 1mb objects stored within
+    long getHeapUsageInMB() {
+      return heapSpace.size();
+    }
+    
+    @Override
+    public void reset() {
+      // no op to stop emulate() from resetting
+    }
+    
+    /**
+     * For re-testing purpose.
+     */
+    void resetFake() {
+      numCalls = 0;
+      super.reset();
+    }
+  }
+
+  /**
+   * This is a dummy class that fakes the heap usage emulator plugin.
+   */
+  private static class FakeHeapUsageEmulatorPlugin 
+  extends TotalHeapUsageEmulatorPlugin {
+    private FakeHeapUsageEmulatorCore core;
+    
+    public FakeHeapUsageEmulatorPlugin(FakeHeapUsageEmulatorCore core) {
+      super(core);
+      this.core = core;
+    }
+    
+    @Override
+    protected long getMaxHeapUsageInMB() {
+      return Long.MAX_VALUE / ONE_MB;
+    }
+    
+    @Override
+    protected long getTotalHeapUsageInMB() {
+      return core.getHeapUsageInMB();
+    }
+  }
+  
+  /**
+   * Test {@link TotalHeapUsageEmulatorPlugin}'s core heap usage emulation 
+   * engine.
+   */
+  @Test
+  public void testHeapUsageEmulator() throws IOException {
+    FakeHeapUsageEmulatorCore heapEmulator = new FakeHeapUsageEmulatorCore();
+    
+    long testSizeInMB = 10; // 10 mb
+    long previousHeap = heapEmulator.getHeapUsageInMB();
+    heapEmulator.load(testSizeInMB);
+    long currentHeap = heapEmulator.getHeapUsageInMB();
+    
+    // check if the heap has increased by expected value
+    assertEquals("Default heap emulator failed to load 10mb", 
+                 previousHeap + testSizeInMB, currentHeap);
+    
+    // test reset
+    heapEmulator.resetFake();
+    assertEquals("Default heap emulator failed to reset", 
+                 0, heapEmulator.getHeapUsageInMB());
+  }
+
+  /**
+   * Test {@link TotalHeapUsageEmulatorPlugin}.
+   */
+  @Test
+  public void testTotalHeapUsageEmulatorPlugin() throws Exception {
+    Configuration conf = new Configuration();
+    // set the dummy resource calculator for testing
+    ResourceCalculatorPlugin monitor = new DummyResourceCalculatorPlugin();
+    long maxHeapUsage = 1024 * TotalHeapUsageEmulatorPlugin.ONE_MB; // 1GB
+    conf.setLong(DummyResourceCalculatorPlugin.MAXPMEM_TESTING_PROPERTY, 
+                 maxHeapUsage);
+    monitor.setConf(conf);
+    
+    // no buffer to be reserved
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+    // only 1 call to be made per cycle
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+    long targetHeapUsageInMB = 200; // 200mb
+    
+    // fake progress indicator
+    FakeProgressive fakeProgress = new FakeProgressive();
+    
+    // fake heap usage generator
+    FakeHeapUsageEmulatorCore fakeCore = new FakeHeapUsageEmulatorCore();
+    
+    // a heap usage emulator with fake core
+    FakeHeapUsageEmulatorPlugin heapPlugin = 
+      new FakeHeapUsageEmulatorPlugin(fakeCore);
+    
+    // test with invalid or missing resource usage value
+    ResourceUsageMetrics invalidUsage = 
+      TestResourceUsageEmulators.createMetrics(0);
+    heapPlugin.initialize(conf, invalidUsage, null, null);
+    
+    // test if disabled heap emulation plugin's emulate() call is a no-operation
+    // this will test if the emulation plugin is disabled or not
+    int numCallsPre = fakeCore.getNumCalls();
+    long heapUsagePre = fakeCore.getHeapUsageInMB();
+    heapPlugin.emulate();
+    int numCallsPost = fakeCore.getNumCalls();
+    long heapUsagePost = fakeCore.getHeapUsageInMB();
+    
+    //  test if no calls are made heap usage emulator core
+    assertEquals("Disabled heap usage emulation plugin works!", 
+                 numCallsPre, numCallsPost);
+    //  test if no calls are made heap usage emulator core
+    assertEquals("Disabled heap usage emulation plugin works!", 
+                 heapUsagePre, heapUsagePost);
+    
+    // test with wrong/invalid configuration
+    Boolean failed = null;
+    invalidUsage = 
+      TestResourceUsageEmulators.createMetrics(maxHeapUsage 
+                                   + TotalHeapUsageEmulatorPlugin.ONE_MB);
+    try {
+      heapPlugin.initialize(conf, invalidUsage, monitor, null);
+      failed = false;
+    } catch (Exception e) {
+      failed = true;
+    }
+    assertNotNull("Fail case failure!", failed);
+    assertTrue("Expected failure!", failed); 
+    
+    // test with valid resource usage value
+    ResourceUsageMetrics metrics = 
+      TestResourceUsageEmulators.createMetrics(targetHeapUsageInMB 
+                                   * TotalHeapUsageEmulatorPlugin.ONE_MB);
+    
+    // test with default emulation interval
+    // in every interval, the emulator will add 100% of the expected usage 
+    // (since gridmix.emulators.resource-usage.heap.load-ratio=1)
+    // so at 10%, emulator will add 10% (difference), at 20% it will add 10% ...
+    // So to emulate 200MB, it will add
+    //   20mb + 20mb + 20mb + 20mb + .. = 200mb 
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 
+                          10);
+    
+    // test with custom value for emulation interval of 20%
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_EMULATION_PROGRESS_INTERVAL,
+                  0.2F);
+    //  40mb + 40mb + 40mb + 40mb + 40mb = 200mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 5);
+    
+    // test with custom value of free heap ratio and load ratio = 1
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0.5F);
+    //  40mb + 0mb + 80mb + 0mb + 0mb = 120mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 120, 2);
+    
+    // test with custom value of heap load ratio and min free heap ratio = 0
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 0.5F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+    // 20mb (call#1) + 20mb (call#1) + 20mb (call#2) + 20mb (call#2) +.. = 200mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 200, 
+                          10);
+    
+    // test with custom value of free heap ratio = 0.3 and load ratio = 0.5
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0.25F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 0.5F);
+    // 20mb (call#1) + 20mb (call#1) + 30mb (call#2) + 0mb (call#2) 
+    // + 30mb (call#3) + 0mb (call#3) + 35mb (call#4) + 0mb (call#4)
+    // + 37mb (call#5) + 0mb (call#5) = 162mb
+    testEmulationAccuracy(conf, fakeCore, monitor, metrics, heapPlugin, 162, 6);
+    
+    // test if emulation interval boundary is respected
+    fakeProgress = new FakeProgressive(); // initialize
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.MIN_HEAP_FREE_RATIO, 0F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_LOAD_RATIO, 1F);
+    conf.setFloat(TotalHeapUsageEmulatorPlugin.HEAP_EMULATION_PROGRESS_INTERVAL,
+                  0.25F);
+    heapPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    fakeCore.resetFake();
+    // take a snapshot after the initialization
+    long initHeapUsage = fakeCore.getHeapUsageInMB();
+    long initNumCallsUsage = fakeCore.getNumCalls();
+    // test with 0 progress
+    testEmulationBoundary(0F, fakeCore, fakeProgress, heapPlugin, initHeapUsage, 
+                          initNumCallsUsage, "[no-op, 0 progress]");
+    // test with 24% progress
+    testEmulationBoundary(0.24F, fakeCore, fakeProgress, heapPlugin, 
+                          initHeapUsage, initNumCallsUsage, 
+                          "[no-op, 24% progress]");
+    // test with 25% progress
+    testEmulationBoundary(0.25F, fakeCore, fakeProgress, heapPlugin, 
+        targetHeapUsageInMB / 4, 1, "[op, 25% progress]");
+    // test with 80% progress
+    testEmulationBoundary(0.80F, fakeCore, fakeProgress, heapPlugin, 
+        (targetHeapUsageInMB * 4) / 5, 2, "[op, 80% progress]");
+    
+    // now test if the final call with 100% progress ramps up the heap usage
+    testEmulationBoundary(1F, fakeCore, fakeProgress, heapPlugin, 
+        targetHeapUsageInMB, 3, "[op, 100% progress]");
+  }
+
+  // test whether the heap usage emulator achieves the desired target using
+  // desired calls to the underling core engine.
+  private static void testEmulationAccuracy(Configuration conf, 
+                        FakeHeapUsageEmulatorCore fakeCore,
+                        ResourceCalculatorPlugin monitor,
+                        ResourceUsageMetrics metrics,
+                        TotalHeapUsageEmulatorPlugin heapPlugin,
+                        long expectedTotalHeapUsageInMB,
+                        long expectedTotalNumCalls)
+  throws Exception {
+    FakeProgressive fakeProgress = new FakeProgressive();
+    fakeCore.resetFake();
+    heapPlugin.initialize(conf, metrics, monitor, fakeProgress);
+    int numLoops = 0;
+    while (fakeProgress.getProgress() < 1) {
+      ++numLoops;
+      float progress = numLoops / 100.0F;
+      fakeProgress.setProgress(progress);
+      heapPlugin.emulate();
+    }
+    
+    // test if the resource plugin shows the expected usage
+    assertEquals("Cumulative heap usage emulator plugin failed (total usage)!", 
+                 expectedTotalHeapUsageInMB, fakeCore.getHeapUsageInMB(), 1L);
+    // test if the resource plugin shows the expected num calls
+    assertEquals("Cumulative heap usage emulator plugin failed (num calls)!", 
+                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+  }
+
+  // tests if the heap usage emulation plugin emulates only at the expected
+  // progress gaps
+  private static void testEmulationBoundary(float progress, 
+      FakeHeapUsageEmulatorCore fakeCore, FakeProgressive fakeProgress, 
+      TotalHeapUsageEmulatorPlugin heapPlugin, long expectedTotalHeapUsageInMB, 
+      long expectedTotalNumCalls, String info) throws Exception {
+    fakeProgress.setProgress(progress);
+    heapPlugin.emulate();
+    // test heap usage
+    assertEquals("Emulation interval test for heap usage failed " + info + "!", 
+                 expectedTotalHeapUsageInMB, fakeCore.getHeapUsageInMB(), 0L);
+    // test num calls
+    assertEquals("Emulation interval test for heap usage failed " + info + "!", 
+                 expectedTotalNumCalls, fakeCore.getNumCalls(), 0L);
+  }
+  
+  /**
+   * Test the specified task java heap options.
+   */
+  @SuppressWarnings("deprecation")
+  private void testJavaHeapOptions(String mapOptions, 
+      String reduceOptions, String taskOptions, String defaultMapOptions, 
+      String defaultReduceOptions, String defaultTaskOptions, 
+      String expectedMapOptions, String expectedReduceOptions, 
+      String expectedTaskOptions) throws Exception {
+    Configuration simulatedConf = new Configuration();
+    // reset the configuration parameters
+    simulatedConf.unset(MRJobConfig.MAP_JAVA_OPTS);
+    simulatedConf.unset(MRJobConfig.REDUCE_JAVA_OPTS);
+    simulatedConf.unset(JobConf.MAPRED_TASK_JAVA_OPTS);
+    
+    // set the default map task options
+    if (defaultMapOptions != null) {
+      simulatedConf.set(MRJobConfig.MAP_JAVA_OPTS, defaultMapOptions);
+    }
+    // set the default reduce task options
+    if (defaultReduceOptions != null) {
+      simulatedConf.set(MRJobConfig.REDUCE_JAVA_OPTS, defaultReduceOptions);
+    }
+    // set the default task options
+    if (defaultTaskOptions != null) {
+      simulatedConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, defaultTaskOptions);
+    }
+    
+    Configuration originalConf = new Configuration();
+    // reset the configuration parameters
+    originalConf.unset(MRJobConfig.MAP_JAVA_OPTS);
+    originalConf.unset(MRJobConfig.REDUCE_JAVA_OPTS);
+    originalConf.unset(JobConf.MAPRED_TASK_JAVA_OPTS);
+    
+    // set the map task options
+    if (mapOptions != null) {
+      originalConf.set(MRJobConfig.MAP_JAVA_OPTS, mapOptions);
+    }
+    // set the reduce task options
+    if (reduceOptions != null) {
+      originalConf.set(MRJobConfig.REDUCE_JAVA_OPTS, reduceOptions);
+    }
+    // set the task options
+    if (taskOptions != null) {
+      originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, taskOptions);
+    }
+    
+    // configure the task jvm's heap options
+    GridmixJob.configureTaskJVMOptions(originalConf, simulatedConf);
+    
+    assertEquals("Map heap options mismatch!", expectedMapOptions, 
+                 simulatedConf.get(MRJobConfig.MAP_JAVA_OPTS));
+    assertEquals("Reduce heap options mismatch!", expectedReduceOptions, 
+                 simulatedConf.get(MRJobConfig.REDUCE_JAVA_OPTS));
+    assertEquals("Task heap options mismatch!", expectedTaskOptions, 
+                 simulatedConf.get(JobConf.MAPRED_TASK_JAVA_OPTS));
+  }
+  
+  /**
+   * Test task-level java heap options configuration in {@link GridmixJob}.
+   */
+  @Test
+  public void testJavaHeapOptions() throws Exception {
+    // test missing opts
+    testJavaHeapOptions(null, null, null, null, null, null, null, null, 
+                        null);
+    
+    // test original heap opts and missing default opts
+    testJavaHeapOptions("-Xms10m", "-Xms20m", "-Xms30m", null, null, null,
+                        null, null, null);
+    
+    // test missing opts with default opts
+    testJavaHeapOptions(null, null, null, "-Xms10m", "-Xms20m", "-Xms30m",
+                        "-Xms10m", "-Xms20m", "-Xms30m");
+    
+    // test empty option
+    testJavaHeapOptions("", "", "", null, null, null, null, null, null);
+    
+    // test empty default option and no original heap options
+    testJavaHeapOptions(null, null, null, "", "", "", "", "", "");
+    
+    // test empty opts and default opts
+    testJavaHeapOptions("", "", "", "-Xmx10m -Xms1m", "-Xmx50m -Xms2m", 
+                        "-Xms2m -Xmx100m", "-Xmx10m -Xms1m", "-Xmx50m -Xms2m", 
+                        "-Xms2m -Xmx100m");
+    
+    // test custom heap opts with no default opts
+    testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx30m", null, null, null,
+                        "-Xmx10m", "-Xmx20m", "-Xmx30m");
+    
+    // test heap opts with default opts (multiple value)
+    testJavaHeapOptions("-Xms5m -Xmx200m", "-Xms15m -Xmx300m", 
+                        "-Xms25m -Xmx50m", "-XXabc", "-XXxyz", "-XXdef", 
+                        "-XXabc -Xmx200m", "-XXxyz -Xmx300m", "-XXdef -Xmx50m");
+    
+    // test heap opts with default opts (duplication of -Xmx)
+    testJavaHeapOptions("-Xms5m -Xmx200m", "-Xms15m -Xmx300m", 
+                        "-Xms25m -Xmx50m", "-XXabc -Xmx500m", "-XXxyz -Xmx600m",
+                        "-XXdef -Xmx700m", "-XXabc -Xmx200m", "-XXxyz -Xmx300m",
+                        "-XXdef -Xmx50m");
+    
+    // test heap opts with default opts (single value)
+    testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx50m", "-Xms2m", 
+                        "-Xms3m", "-Xms5m", "-Xms2m -Xmx10m", "-Xms3m -Xmx20m",
+                        "-Xms5m -Xmx50m");
+    
+    // test heap opts with default opts (duplication of -Xmx)
+    testJavaHeapOptions("-Xmx10m", "-Xmx20m", "-Xmx50m", "-Xmx2m", 
+                        "-Xmx3m", "-Xmx5m", "-Xmx10m", "-Xmx20m", "-Xmx50m");
+  }
+  
+  /**
+   * Test disabled task heap options configuration in {@link GridmixJob}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testJavaHeapOptionsDisabled() throws Exception {
+    Configuration gridmixConf = new Configuration();
+    gridmixConf.setBoolean(GridmixJob.GRIDMIX_TASK_JVM_OPTIONS_ENABLE, false);
+    
+    // set the default values of simulated job
+    gridmixConf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx1m");
+    gridmixConf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx2m");
+    gridmixConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx3m");
+    
+    // set the default map and reduce task options for original job
+    final JobConf originalConf = new JobConf();
+    originalConf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx10m");
+    originalConf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx20m");
+    originalConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, "-Xmx30m");
+    
+    // define a mock job
+    MockJob story = new MockJob(originalConf) {
+      public JobConf getJobConf() {
+        return originalConf;
+      }
+    };
+    
+    GridmixJob job = new DummyGridmixJob(gridmixConf, story);
+    Job simulatedJob = job.getJob();
+    Configuration simulatedConf = simulatedJob.getConfiguration();
+    
+    assertEquals("Map heap options works when disabled!", "-Xmx1m", 
+                 simulatedConf.get(MRJobConfig.MAP_JAVA_OPTS));
+    assertEquals("Reduce heap options works when disabled!", "-Xmx2m", 
+                 simulatedConf.get(MRJobConfig.REDUCE_JAVA_OPTS));
+    assertEquals("Task heap options works when disabled!", "-Xmx3m", 
+                 simulatedConf.get(JobConf.MAPRED_TASK_JAVA_OPTS));
+  }
+}

+ 1 - 1
mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestHighRamJob.java

@@ -41,7 +41,7 @@ public class TestHighRamJob {
   /**
   /**
    * A dummy {@link GridmixJob} that opens up the simulated job for testing.
    * A dummy {@link GridmixJob} that opens up the simulated job for testing.
    */
    */
-  private static class DummyGridmixJob extends GridmixJob {
+  protected static class DummyGridmixJob extends GridmixJob {
     public DummyGridmixJob(Configuration conf, JobStory desc) 
     public DummyGridmixJob(Configuration conf, JobStory desc) 
     throws IOException {
     throws IOException {
       super(conf, System.currentTimeMillis(), desc, new Path("test"), 
       super(conf, System.currentTimeMillis(), desc, new Path("test"), 

+ 4 - 4
mapreduce/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestResourceUsageEmulators.java

@@ -427,7 +427,7 @@ public class TestResourceUsageEmulators {
   }
   }
   
   
   // Creates a ResourceUsageMetrics object from the target usage
   // Creates a ResourceUsageMetrics object from the target usage
-  private static ResourceUsageMetrics createMetrics(long target) {
+  static ResourceUsageMetrics createMetrics(long target) {
     ResourceUsageMetrics metrics = new ResourceUsageMetrics();
     ResourceUsageMetrics metrics = new ResourceUsageMetrics();
     metrics.setCumulativeCpuUsage(target);
     metrics.setCumulativeCpuUsage(target);
     metrics.setVirtualMemoryUsage(target);
     metrics.setVirtualMemoryUsage(target);
@@ -487,7 +487,7 @@ public class TestResourceUsageEmulators {
                           targetCpuUsage, targetCpuUsage / unitCpuUsage);
                           targetCpuUsage, targetCpuUsage / unitCpuUsage);
     
     
     // test with custom value for emulation interval of 20%
     // test with custom value for emulation interval of 20%
-    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_PROGRESS_INTERVAL,
                   0.2F);
                   0.2F);
     testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin, 
     testEmulationAccuracy(conf, fakeCore, monitor, metrics, cpuPlugin, 
                           targetCpuUsage, targetCpuUsage / unitCpuUsage);
                           targetCpuUsage, targetCpuUsage / unitCpuUsage);
@@ -497,7 +497,7 @@ public class TestResourceUsageEmulators {
     fakeProgress = new FakeProgressive(); // initialize
     fakeProgress = new FakeProgressive(); // initialize
     fakeCore.reset();
     fakeCore.reset();
     fakeCore.setUnitUsage(1);
     fakeCore.setUnitUsage(1);
-    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_PROGRESS_INTERVAL,
                   0.25F);
                   0.25F);
     cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
     cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
     // take a snapshot after the initialization
     // take a snapshot after the initialization
@@ -534,7 +534,7 @@ public class TestResourceUsageEmulators {
     fakeProgress = new FakeProgressive(); // initialize
     fakeProgress = new FakeProgressive(); // initialize
     fakeCore.reset();
     fakeCore.reset();
     fakeCore.setUnitUsage(unitCpuUsage);
     fakeCore.setUnitUsage(unitCpuUsage);
-    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_FREQUENCY,
+    conf.setFloat(CumulativeCpuUsageEmulatorPlugin.CPU_EMULATION_PROGRESS_INTERVAL,
                   0.40F);
                   0.40F);
     cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
     cpuPlugin.initialize(conf, metrics, monitor, fakeProgress);
     // take a snapshot after the initialization
     // take a snapshot after the initialization

+ 27 - 2
mapreduce/src/docs/src/documentation/content/xdocs/gridmix.xml

@@ -198,6 +198,15 @@ hadoop jar &lt;gridmix-jar&gt; org.apache.hadoop.mapred.gridmix.Gridmix \
           <td>The maximum size of the input files. The default limit is 100
           <td>The maximum size of the input files. The default limit is 100
 	  TiB.</td>
 	  TiB.</td>
         </tr>
         </tr>
+        <tr>
+          <td>
+            <code>gridmix.task.jvm-options.enable</code>
+          </td>
+          <td>Enables Gridmix to configure the simulated task's max heap 
+              options using the values obtained from the original task (i.e via
+              trace).
+          </td>
+        </tr>
       </table>
       </table>
     </section>
     </section>
     <section id="jobtypes">
     <section id="jobtypes">
@@ -703,10 +712,26 @@ hadoop jar &lt;gridmix-jar&gt; org.apache.hadoop.mapred.gridmix.Gridmix \
            CPU usage emulator is designed in such a way that
            CPU usage emulator is designed in such a way that
            it only emulates at specific progress boundaries of the task. This 
            it only emulates at specific progress boundaries of the task. This 
            interval can be configured using 
            interval can be configured using 
-           <code>gridmix.emulators.resource-usage.cpu.frequency</code>. The 
-           default value for this parameter is <code>0.1</code> i.e 
+           <code>gridmix.emulators.resource-usage.cpu.emulation-interval</code>.
+           The default value for this parameter is <code>0.1</code> i.e 
            <code>10%</code>.
            <code>10%</code>.
        </li>
        </li>
+       <li>Total heap usage <em>emulator</em>: 
+           GridMix uses the total heap usage value published by Rumen 
+           and makes sure that the total heap usage of the simulated 
+           task is close to the value published by Rumen. GridMix can be 
+           configured to emulate cumulative heap usage by adding 
+           <code>org.apache.hadoop.mapred.gridmix.emulators.resourceusage
+                 .TotalHeapUsageEmulatorPlugin</code> to the list of emulator 
+           <em>plugins</em> configured for the 
+           <code>gridmix.emulators.resource-usage.plugins</code> parameter.
+           Heap usage emulator is designed in such a way that
+           it only emulates at specific progress boundaries of the task. This 
+           interval can be configured using 
+           <code>gridmix.emulators.resource-usage.heap.emulation-interval
+           </code>. The default value for this parameter is <code>0.1</code> 
+           i.e <code>10%</code> progress interval.
+</li>
      </ul>
      </ul>
      <p>Note that GridMix will emulate resource usages only for jobs of type 
      <p>Note that GridMix will emulate resource usages only for jobs of type 
         <em>LOADJOB</em>.
         <em>LOADJOB</em>.