|
@@ -37,23 +37,23 @@ public class ResourceEstimator {
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * Estimated ratio of output to input size for map tasks.
|
|
|
+ * Estimated ratio of output to (input size+1) for map tasks.
|
|
|
*/
|
|
|
private double mapBlowupRatio;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * How much relative weight to put on the current estimate.
|
|
|
+ * Each completed map has unit weight.
|
|
|
+ */
|
|
|
private double estimateWeight;
|
|
|
- private JobInProgress job;
|
|
|
-
|
|
|
- //guess a factor of two blowup due to temp space for merge
|
|
|
- public static final double INITIAL_BLOWUP_GUESS = 1;
|
|
|
-
|
|
|
- //initial estimate is weighted as much as this fraction of the real datapoints
|
|
|
- static final double INITIAL_EST_WEIGHT_PERCENT = 0.05;
|
|
|
-
|
|
|
+ final private JobInProgress job;
|
|
|
+ final private int threshholdToUse;
|
|
|
|
|
|
public ResourceEstimator(JobInProgress job) {
|
|
|
- mapBlowupRatio = INITIAL_BLOWUP_GUESS;
|
|
|
this.job = job;
|
|
|
- estimateWeight = INITIAL_EST_WEIGHT_PERCENT * job.desiredMaps();
|
|
|
+ threshholdToUse = job.desiredMaps()/ 10;
|
|
|
+ mapBlowupRatio = 0;
|
|
|
+ estimateWeight = 1;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -69,45 +69,71 @@ public class ResourceEstimator {
|
|
|
mapBlowupRatio = b;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
- public void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
|
|
|
-
|
|
|
+ void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
|
|
|
+
|
|
|
//-1 indicates error, which we don't average in.
|
|
|
if(tip.isMapTask() && ts.getOutputSize() != -1) {
|
|
|
double blowupOnThisTask = ts.getOutputSize() /
|
|
|
- (double) tip.getMapInputSize();
|
|
|
+ ((double) tip.getMapInputSize() + 1);
|
|
|
|
|
|
LOG.info("measured blowup on " + tip.getTIPId() + " was " +
|
|
|
- ts.getOutputSize() + "/" +tip.getMapInputSize() + " = "
|
|
|
+ ts.getOutputSize() + "/" +(tip.getMapInputSize()+1) + " = "
|
|
|
+ blowupOnThisTask);
|
|
|
|
|
|
- double newEstimate = blowupOnThisTask / estimateWeight +
|
|
|
- ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
|
|
|
- estimateWeight++;
|
|
|
+ double newEstimate;
|
|
|
+ synchronized(this) {
|
|
|
+ newEstimate = blowupOnThisTask / estimateWeight +
|
|
|
+ ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
|
|
|
+ estimateWeight++;
|
|
|
+ }
|
|
|
setBlowupRatio(newEstimate);
|
|
|
+
|
|
|
+ LOG.info("new estimate is blowup = " + newEstimate);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- *
|
|
|
+ * @return estimated length of this job's total map output
|
|
|
+ */
|
|
|
+ protected long getEstimatedTotalMapOutputSize() {
|
|
|
+ double estWeight;
|
|
|
+ synchronized(this) {
|
|
|
+ estWeight = this.estimateWeight;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(estWeight < threshholdToUse) {
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ double blowup =getBlowupRatio();
|
|
|
+ long inputSize = job.getInputLength() + job.desiredMaps();
|
|
|
+ //add desiredMaps() so that randomwriter case doesn't blow up
|
|
|
+ long estimate = Math.round(inputSize * blowup * 2.0);
|
|
|
+
|
|
|
+ LOG.debug("estimate total map output will be " + estimate +
|
|
|
+ " bytes. (blowup = 2*" + blowup + ")");
|
|
|
+ return estimate;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* @return estimated length of this job's average map output
|
|
|
- * @throws IOException if the split's getLength() does.
|
|
|
*/
|
|
|
- public long getEstimatedMapOutputSize() {
|
|
|
- double blowup =getBlowupRatio();
|
|
|
- long estimate =
|
|
|
- (long) (job.getInputLength() * blowup / job.desiredMaps() * 2.0);
|
|
|
- LOG.info("estimate map will take " + estimate +
|
|
|
- " bytes. (blowup = 2*" + blowup + ")");
|
|
|
+ long getEstimatedMapOutputSize() {
|
|
|
+ long estimate = getEstimatedTotalMapOutputSize() / job.desiredMaps();
|
|
|
return estimate;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- //estimate that each reduce gets an equal share of total map output
|
|
|
- public long getEstimatedReduceInputSize() {
|
|
|
- return
|
|
|
- getEstimatedMapOutputSize() * job.desiredMaps() / job.desiredReduces();
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @return estimated length of this job's average reduce input
|
|
|
+ */
|
|
|
+ long getEstimatedReduceInputSize() {
|
|
|
+ if(job.desiredReduces() == 0) {//no reduce output, so no size
|
|
|
+ return 0;
|
|
|
+ } else {
|
|
|
+ return getEstimatedTotalMapOutputSize() / job.desiredReduces();
|
|
|
+ //estimate that each reduce gets an equal share of total map output
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|