|
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.mapreduce.TaskCounter;
|
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
|
import org.apache.hadoop.mapreduce.Counter;
|
|
|
import org.apache.hadoop.mapreduce.CounterGroup;
|
|
@@ -105,7 +106,7 @@ public class GridmixJobVerification {
|
|
|
* @throws ParseException - if an parse error occurs.
|
|
|
*/
|
|
|
public void verifyGridmixJobsWithJobStories(List<JobID> jobids)
|
|
|
- throws IOException, ParseException {
|
|
|
+ throws Exception {
|
|
|
|
|
|
SortedMap <Long, String> origSubmissionTime = new TreeMap <Long, String>();
|
|
|
SortedMap <Long, String> simuSubmissionTime = new TreeMap<Long, String>();
|
|
@@ -147,6 +148,7 @@ public class GridmixJobVerification {
|
|
|
setJobDistributedCacheInfo(simuJobId.toString(), simuJobConf,
|
|
|
zombieJob.getJobConf());
|
|
|
verifyHighRamMemoryJobs(zombieJob, simuJobConf);
|
|
|
+ verifyCPUEmulationOfJobs(zombieJob, jhInfo, simuJobConf);
|
|
|
LOG.info("Done.");
|
|
|
}
|
|
|
verifyDistributedCacheBetweenJobs(simuAndOrigJobsInfo);
|
|
@@ -353,6 +355,119 @@ public class GridmixJobVerification {
|
|
|
fs.close();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * It verifies the cpu resource usage of a gridmix job against
|
|
|
+ * their original job.
|
|
|
+ * @param origJobHistory - Original job history.
|
|
|
+ * @param simuJobHistoryInfo - Simulated job history.
|
|
|
+ * @param simuJobConf - simulated job configuration.
|
|
|
+ */
|
|
|
+ public void verifyCPUEmulationOfJobs(ZombieJob origJobHistory,
|
|
|
+ JobHistoryParser.JobInfo simuJobHistoryInfo,
|
|
|
+ JobConf simuJobConf) throws Exception {
|
|
|
+
|
|
|
+ if (simuJobConf.get(GridMixConfig.GRIDMIX_CPU_EMULATON) != null) {
|
|
|
+ Map<String,Long> origJobMetrics =
|
|
|
+ getOriginalJobCPUMetrics(origJobHistory);
|
|
|
+ Map<String,Long> simuJobMetrics =
|
|
|
+ getSimulatedJobCPUMetrics(simuJobHistoryInfo);
|
|
|
+
|
|
|
+ long origMapUsage = origJobMetrics.get("MAP");
|
|
|
+ LOG.info("Maps cpu usage of original job:" + origMapUsage);
|
|
|
+
|
|
|
+ long origReduceUsage = origJobMetrics.get("REDUCE");
|
|
|
+ LOG.info("Reduces cpu usage of original job:" + origReduceUsage);
|
|
|
+
|
|
|
+ long simuMapUsage = simuJobMetrics.get("MAP");
|
|
|
+ LOG.info("Maps cpu usage of simulated job:" + simuMapUsage);
|
|
|
+
|
|
|
+ long simuReduceUsage = simuJobMetrics.get("REDUCE");
|
|
|
+ LOG.info("Reduces cpu usage of simulated job:"+ simuReduceUsage);
|
|
|
+
|
|
|
+ long mapCount = simuJobHistoryInfo.getTotalMaps();
|
|
|
+ long reduceCount = simuJobHistoryInfo.getTotalReduces();
|
|
|
+
|
|
|
+ if (mapCount > 0) {
|
|
|
+ double mapEmulFactor = (simuMapUsage * 100) / origMapUsage;
|
|
|
+ long mapEmulAccuracy = Math.round(mapEmulFactor);
|
|
|
+ LOG.info("CPU emulation accuracy for maps in job " +
|
|
|
+ simuJobHistoryInfo.getJobId() +
|
|
|
+ ":"+ mapEmulAccuracy + "%");
|
|
|
+ Assert.assertTrue("Map-side cpu emulaiton inaccurate!" +
|
|
|
+ " Actual cpu usage: " + simuMapUsage +
|
|
|
+ " Expected cpu usage: " + origMapUsage, mapEmulAccuracy
|
|
|
+ >= GridMixConfig.GRIDMIX_CPU_EMULATION_LOWER_LIMIT
|
|
|
+ && mapEmulAccuracy
|
|
|
+ <= GridMixConfig.GRIDMIX_CPU_EMULATION_UPPER_LIMIT);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (reduceCount >0) {
|
|
|
+ double reduceEmulFactor = (simuReduceUsage * 100) / origReduceUsage;
|
|
|
+ long reduceEmulAccuracy = Math.round(reduceEmulFactor);
|
|
|
+ LOG.info("CPU emulation accuracy for reduces in job " +
|
|
|
+ simuJobHistoryInfo.getJobId() +
|
|
|
+ ": " + reduceEmulAccuracy + "%");
|
|
|
+ Assert.assertTrue("Reduce side cpu emulaiton inaccurate!" +
|
|
|
+ " Actual cpu usage:" + simuReduceUsage +
|
|
|
+ "Expected cpu usage: " + origReduceUsage,
|
|
|
+ reduceEmulAccuracy
|
|
|
+ >= GridMixConfig.GRIDMIX_CPU_EMULATION_LOWER_LIMIT
|
|
|
+ && reduceEmulAccuracy
|
|
|
+ <= GridMixConfig.GRIDMIX_CPU_EMULATION_UPPER_LIMIT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the simulated job cpu metrics.
|
|
|
+ * @param jhInfo - Simulated job history
|
|
|
+ * @return - cpu metrics as a map.
|
|
|
+ * @throws Exception - if an error occurs.
|
|
|
+ */
|
|
|
+ private Map<String,Long> getSimulatedJobCPUMetrics(
|
|
|
+ JobHistoryParser.JobInfo jhInfo) throws Exception {
|
|
|
+ Map<String, Long> resourceMetrics = new HashMap<String, Long>();
|
|
|
+ long mapCPUUsage =
|
|
|
+ getCounterValue(jhInfo.getMapCounters(),
|
|
|
+ TaskCounter.CPU_MILLISECONDS.toString());
|
|
|
+ resourceMetrics.put("MAP", mapCPUUsage);
|
|
|
+ long reduceCPUUsage =
|
|
|
+ getCounterValue(jhInfo.getReduceCounters(),
|
|
|
+ TaskCounter.CPU_MILLISECONDS.toString());
|
|
|
+ resourceMetrics.put("REDUCE", reduceCPUUsage);
|
|
|
+ return resourceMetrics;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the original job cpu metrics.
|
|
|
+ * @param zombieJob - original job history.
|
|
|
+ * @return - cpu metrics as map.
|
|
|
+ */
|
|
|
+ private Map<String, Long> getOriginalJobCPUMetrics(ZombieJob zombieJob) {
|
|
|
+ long mapTotalCPUUsage = 0;
|
|
|
+ long reduceTotalCPUUsage = 0;
|
|
|
+ Map<String,Long> resourceMetrics = new HashMap<String,Long>();
|
|
|
+
|
|
|
+ for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
|
|
|
+ TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
|
|
|
+ if (mapTask.getResourceUsageMetrics().getCumulativeCpuUsage() > 0) {
|
|
|
+ mapTotalCPUUsage +=
|
|
|
+ mapTask.getResourceUsageMetrics().getCumulativeCpuUsage();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ resourceMetrics.put("MAP", mapTotalCPUUsage);
|
|
|
+
|
|
|
+ for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
|
|
|
+ TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
|
|
|
+ if (reduceTask.getResourceUsageMetrics().getCumulativeCpuUsage() > 0) {
|
|
|
+ reduceTotalCPUUsage +=
|
|
|
+ reduceTask.getResourceUsageMetrics().getCumulativeCpuUsage();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ resourceMetrics.put("REDUCE", reduceTotalCPUUsage);
|
|
|
+ return resourceMetrics;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get the user resolver of a job.
|