浏览代码

HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation formula linear where blowUp = Total-Output/Total-Input. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@746944 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父节点
当前提交
531fecc4e9

+ 3 - 0
CHANGES.txt

@@ -851,6 +851,9 @@ Release 0.20.0 - Unreleased
     references to completedJobStore outside the block where the JobTracker is locked.
     (ddas)
 
+    HADOOP-5241. Fixes a bug in disk-space resource estimation. Makes the estimation
+    formula linear where blowUp = Total-Output/Total-Input. (Sharad Agarwal via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 18 - 54
src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java

@@ -34,82 +34,46 @@ class ResourceEstimator {
   private static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.ResourceEstimator");
 
+  private long completedMapsInputSize;
+  private long completedMapsOutputSize;
 
-  /**
-   * Estimated ratio of output to (input size+1) for map tasks. 
-   */
-  private double mapBlowupRatio;
-  
-  /**
-   * How much relative weight to put on the current estimate.
-   * Each completed map has unit weight.
-   */
-  private double estimateWeight;
+  private int completedMapsUpdates;
   final private JobInProgress job;
   final private int threshholdToUse;
 
   public ResourceEstimator(JobInProgress job) {
     this.job = job;
     threshholdToUse = job.desiredMaps()/ 10;
-    mapBlowupRatio = 0;
-    estimateWeight = 1;
   }
 
+  protected synchronized void updateWithCompletedTask(TaskStatus ts, 
+      TaskInProgress tip) {
 
-  /**
-   * Have private access methods to abstract away synchro.
-   * @return
-   */
-  private synchronized double getBlowupRatio() {
-    return mapBlowupRatio;
-  }
-
-  private synchronized void setBlowupRatio(double b)  {
-    mapBlowupRatio = b;
-  }
-
-  void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
-    
     //-1 indicates error, which we don't average in.
     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
-      double blowupOnThisTask = ts.getOutputSize() / 
-        ((double) tip.getMapInputSize() + 1);
-      
-      LOG.info("measured blowup on " + tip.getTIPId() + " was " +
-          ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = " 
-          + blowupOnThisTask);
-      
-      double newEstimate;
-      synchronized(this) {
-        newEstimate = blowupOnThisTask / estimateWeight + 
-            ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
-        estimateWeight++; 
-      }
-      setBlowupRatio(newEstimate);
-      
-      LOG.info("new estimate is blowup = " + newEstimate);
+      completedMapsUpdates++;
+
+      completedMapsInputSize+=(tip.getMapInputSize()+1);
+      completedMapsOutputSize+=ts.getOutputSize();
+
+      LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
+          "completedMapsInputSize:"+completedMapsInputSize+"  " +
+        "completedMapsOutputSize:"+completedMapsOutputSize);
     }
   }
 
   /**
    * @return estimated length of this job's total map output
    */
-  protected long getEstimatedTotalMapOutputSize()  {
-    double estWeight;
-    synchronized(this) {
-      estWeight = this.estimateWeight;
-    }
-    
-    if(estWeight < threshholdToUse) {
+  protected synchronized long getEstimatedTotalMapOutputSize()  {
+    if(completedMapsUpdates < threshholdToUse) {
       return 0;
     } else {
-      double blowup =getBlowupRatio();
       long inputSize = job.getInputLength() + job.desiredMaps(); 
       //add desiredMaps() so that randomwriter case doesn't blow up
-      long estimate = Math.round(inputSize * blowup * 2.0);
-  
-      LOG.debug("estimate total map output will be " + estimate +
-          " bytes. (blowup = 2*" + blowup + ")");
+      long estimate = Math.round((inputSize * 
+          completedMapsOutputSize * 2.0)/completedMapsInputSize);
+      LOG.debug("estimate total map output will be " + estimate);
       return estimate;
     }
   }

+ 47 - 1
src/test/org/apache/hadoop/mapred/TestResourceEstimation.java

@@ -36,7 +36,7 @@ public class TestResourceEstimation extends TestCase {
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
-    for(int i = 0; i < maps / 10 -1; ++i) {
+    for(int i = 0; i < maps / 10 ; ++i) {
 
       long estOutSize = re.getEstimatedMapOutputSize();
       System.out.println(estOutSize);
@@ -49,10 +49,56 @@ public class TestResourceEstimation extends TestCase {
       TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
       re.updateWithCompletedTask(ts, tip);
     }
+    assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
+    assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+    
+  }
+  
+  public void testWithNonZeroInput() throws Exception {
+    final int maps = 100;
+    final int reduces = 2;
+    final int singleMapOutputSize = 1000;
+    final int singleMapInputSize = 500;
+    JobConf jc = new JobConf();
+    JobID jid = new JobID("testJT", 0);
+    jc.setNumMapTasks(maps);
+    jc.setNumReduceTasks(reduces);
+    
+    JobInProgress jip = new JobInProgress(jid, jc) {
+      long getInputLength() {
+        return singleMapInputSize*desiredMaps();
+      }
+    };
+    ResourceEstimator re = new ResourceEstimator(jip);
+    
+    for(int i = 0; i < maps / 10 ; ++i) {
+
+      long estOutSize = re.getEstimatedMapOutputSize();
+      System.out.println(estOutSize);
+      assertEquals(0, estOutSize);
+      
+      TaskStatus ts = new MapTaskStatus();
+      ts.setOutputSize(singleMapOutputSize);
+      RawSplit split = new RawSplit();
+      split.setDataLength(singleMapInputSize);
+      TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+      re.updateWithCompletedTask(ts, tip);
+    }
     
     assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
     assertEquals(2* singleMapOutputSize * maps / reduces, re.getEstimatedReduceInputSize());
+
+    //add one more map task with input size as 0
+    TaskStatus ts = new MapTaskStatus();
+    ts.setOutputSize(singleMapOutputSize);
+    RawSplit split = new RawSplit();
+    split.setDataLength(0);
+    TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0);
+    re.updateWithCompletedTask(ts, tip);
     
+    long expectedTotalMapOutSize = (singleMapOutputSize*11) * 
+      ((maps*singleMapInputSize)+maps)/((singleMapInputSize+1)*10+1);
+    assertEquals(2* expectedTotalMapOutSize/maps, re.getEstimatedMapOutputSize());
   }
 
 }