|
@@ -17,12 +17,13 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.mapred.gridmix;
|
|
|
|
|
|
+import static org.apache.hadoop.mapred.Task.Counter.SPILLED_RECORDS;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.NullWritable;
|
|
|
-import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
|
import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
|
|
|
import org.apache.hadoop.mapreduce.InputFormat;
|
|
@@ -72,7 +73,7 @@ class LoadJob extends GridmixJob {
|
|
|
job.setNumReduceTasks(jobdesc.getNumberReduces());
|
|
|
job.setMapOutputKeyClass(GridmixKey.class);
|
|
|
job.setMapOutputValueClass(GridmixRecord.class);
|
|
|
- job.setSortComparatorClass(GridmixKey.Comparator.class);
|
|
|
+ job.setSortComparatorClass(LoadSortComparator.class);
|
|
|
job.setGroupingComparatorClass(SpecGroupingComparator.class);
|
|
|
job.setInputFormatClass(LoadInputFormat.class);
|
|
|
job.setOutputFormatClass(RawBytesOutputFormat.class);
|
|
@@ -94,18 +95,85 @@ class LoadJob extends GridmixJob {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This is a load matching key comparator which will make sure that the
|
|
|
+ * resource usage load is matched even when the framework is in control.
|
|
|
+ */
|
|
|
+ public static class LoadSortComparator extends GridmixKey.Comparator {
|
|
|
+ private ResourceUsageMatcherRunner matcher = null;
|
|
|
+ private boolean isConfigured = false;
|
|
|
+
|
|
|
+ public LoadSortComparator() {
|
|
|
+ super();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
|
|
+ configure();
|
|
|
+ int ret = super.compare(b1, s1, l1, b2, s2, l2);
|
|
|
+ if (matcher != null) {
|
|
|
+ try {
|
|
|
+ matcher.match(); // match the resource usage now
|
|
|
+ } catch (Exception e) {}
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO Note that the sorter will be instantiated 2 times as follows
|
|
|
+ // 1. During the sort/spill in the map phase
|
|
|
+ // 2. During the merge in the sort phase
|
|
|
+ // We need the handle to the matcher thread only in (2).
|
|
|
+ // This logic can be relaxed to run only in (2).
|
|
|
+ private void configure() {
|
|
|
+ if (!isConfigured) {
|
|
|
+ ThreadGroup group = Thread.currentThread().getThreadGroup();
|
|
|
+ Thread[] threads = new Thread[group.activeCount() * 2];
|
|
|
+ group.enumerate(threads, true);
|
|
|
+ for (Thread t : threads) {
|
|
|
+ if (t != null && (t instanceof ResourceUsageMatcherRunner)) {
|
|
|
+ this.matcher = (ResourceUsageMatcherRunner) t;
|
|
|
+ isConfigured = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This is a progress based resource usage matcher.
|
|
|
*/
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- static class ResourceUsageMatcherRunner extends Thread {
|
|
|
+ static class ResourceUsageMatcherRunner extends Thread
|
|
|
+ implements Progressive {
|
|
|
private final ResourceUsageMatcher matcher;
|
|
|
- private final Progressive progress;
|
|
|
+ private final BoostingProgress progress;
|
|
|
private final long sleepTime;
|
|
|
private static final String SLEEP_CONFIG =
|
|
|
"gridmix.emulators.resource-usage.sleep-duration";
|
|
|
private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
|
|
|
|
|
|
+ /**
|
|
|
+ * This is a progress bar that can be boosted for weaker use-cases.
|
|
|
+ */
|
|
|
+ private static class BoostingProgress implements Progressive {
|
|
|
+ private float boostValue = 0f;
|
|
|
+ TaskInputOutputContext context;
|
|
|
+
|
|
|
+ BoostingProgress(TaskInputOutputContext context) {
|
|
|
+ this.context = context;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setBoostValue(float boostValue) {
|
|
|
+ this.boostValue = boostValue;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public float getProgress() {
|
|
|
+ return Math.min(1f, context.getProgress() + boostValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
ResourceUsageMatcherRunner(final TaskInputOutputContext context,
|
|
|
ResourceUsageMetrics metrics) {
|
|
|
Configuration conf = context.getConfiguration();
|
|
@@ -119,19 +187,14 @@ class LoadJob extends GridmixJob {
|
|
|
|
|
|
// set the other parameters
|
|
|
this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
|
|
|
- progress = new Progressive() {
|
|
|
- @Override
|
|
|
- public float getProgress() {
|
|
|
- return context.getProgress();
|
|
|
- }
|
|
|
- };
|
|
|
+ progress = new BoostingProgress(context);
|
|
|
|
|
|
// instantiate a resource-usage-matcher
|
|
|
matcher = new ResourceUsageMatcher();
|
|
|
matcher.configure(conf, plugin, metrics, progress);
|
|
|
}
|
|
|
|
|
|
- protected void match() throws Exception {
|
|
|
+ protected void match() throws IOException, InterruptedException {
|
|
|
// match the resource usage
|
|
|
matcher.matchResourceUsage();
|
|
|
}
|
|
@@ -158,21 +221,34 @@ class LoadJob extends GridmixJob {
|
|
|
+ " thread! Exiting.", e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public float getProgress() {
|
|
|
+ return matcher.getProgress();
|
|
|
+ }
|
|
|
+
|
|
|
+ // boost the progress bar as fasten up the emulation cycles.
|
|
|
+ void boost(float value) {
|
|
|
+ progress.setBoostValue(value);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
|
|
|
// they are emulating
|
|
|
private static class StatusReporter extends Thread {
|
|
|
private TaskInputOutputContext context;
|
|
|
- StatusReporter(TaskInputOutputContext context) {
|
|
|
+ private final Progressive progress;
|
|
|
+
|
|
|
+ StatusReporter(TaskInputOutputContext context, Progressive progress) {
|
|
|
this.context = context;
|
|
|
+ this.progress = progress;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
LOG.info("Status reporter thread started.");
|
|
|
try {
|
|
|
- while (context.getProgress() < 1) {
|
|
|
+ while (!isInterrupted() && progress.getProgress() < 1) {
|
|
|
// report progress
|
|
|
context.progress();
|
|
|
|
|
@@ -275,7 +351,7 @@ class LoadJob extends GridmixJob {
|
|
|
split.getMapResourceUsageMetrics());
|
|
|
|
|
|
// start the status reporter thread
|
|
|
- reporter = new StatusReporter(ctxt);
|
|
|
+ reporter = new StatusReporter(ctxt, matcher);
|
|
|
reporter.start();
|
|
|
}
|
|
|
|
|
@@ -322,6 +398,17 @@ class LoadJob extends GridmixJob {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // check if the thread will get a chance to run or not
|
|
|
+ // check if there will be a sort&spill->merge phase or not
|
|
|
+ // check if the final sort&spill->merge phase is gonna happen or not
|
|
|
+ if (context.getNumReduceTasks() > 0
|
|
|
+ && context.getCounter(SPILLED_RECORDS).getValue() == 0) {
|
|
|
+ LOG.info("Boosting the map phase progress.");
|
|
|
+ // add the sort phase progress to the map phase and emulate
|
|
|
+ matcher.boost(0.33f);
|
|
|
+ matcher.match();
|
|
|
+ }
|
|
|
+
|
|
|
// start the matcher thread since the map phase ends here
|
|
|
matcher.start();
|
|
|
}
|
|
@@ -390,7 +477,7 @@ class LoadJob extends GridmixJob {
|
|
|
matcher = new ResourceUsageMatcherRunner(context, metrics);
|
|
|
|
|
|
// start the status reporter thread
|
|
|
- reporter = new StatusReporter(context);
|
|
|
+ reporter = new StatusReporter(context, matcher);
|
|
|
reporter.start();
|
|
|
}
|
|
|
@Override
|
|
@@ -524,8 +611,12 @@ class LoadJob extends GridmixJob {
|
|
|
specRecords[j] = info.getOutputRecords();
|
|
|
metrics[j] = info.getResourceUsageMetrics();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
|
|
|
- i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
|
|
|
+ LOG.debug(String.format("SPEC(%d) %d -> %d %d %d %d %d %d %d", id(),
|
|
|
+ i, i + j * maps, info.getOutputRecords(), info.getOutputBytes(),
|
|
|
+ info.getResourceUsageMetrics().getCumulativeCpuUsage(),
|
|
|
+ info.getResourceUsageMetrics().getPhysicalMemoryUsage(),
|
|
|
+ info.getResourceUsageMetrics().getVirtualMemoryUsage(),
|
|
|
+ info.getResourceUsageMetrics().getHeapUsage()));
|
|
|
}
|
|
|
}
|
|
|
final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
|