|
@@ -106,11 +106,11 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
* Get Job Counters of type String
|
|
|
*/
|
|
|
public String getStringValue(Enum key) {
|
|
|
- if (this._job.get(key) == null) {
|
|
|
- return "";
|
|
|
- } else {
|
|
|
+ if (this._job.get(key) == null) {
|
|
|
+ return "";
|
|
|
+ } else {
|
|
|
return this._job.get(key);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -154,6 +154,13 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
this._job = new Hashtable<Enum, String>();
|
|
|
populate_Job(this._job, this._jobInfo.getValues());
|
|
|
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
|
|
|
+
|
|
|
+ // Add the Job Type: MAP_REDUCE, MAP_ONLY
|
|
|
+ if (getLongValue(JobKeys.TOTAL_REDUCES) == 0) {
|
|
|
+ this._job.put(JobKeys.JOBTYPE,"MAP_ONLY");
|
|
|
+ } else {
|
|
|
+ this._job.put(JobKeys.JOBTYPE,"MAP_REDUCE");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -179,7 +186,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
if (successTaskAttemptMap != null) {
|
|
|
mapTask.putAll(successTaskAttemptMap);
|
|
|
} else {
|
|
|
- System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
|
|
|
+ System.err.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
|
|
|
}
|
|
|
int size = mapTask.size();
|
|
|
java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = mapTask.entrySet().iterator();
|
|
@@ -207,63 +214,73 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
parseAndAddMapTaskCounters(mapT, value);
|
|
|
mapTaskList.add(mapT);
|
|
|
break;
|
|
|
- default: System.out.println("JobHistory.MapKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
|
|
|
+ default: System.err.println("JobHistory.MapKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Add number of task attempts
|
|
|
mapT.setValue(MapTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
|
|
|
+
|
|
|
+ // Add EXECUTION_TIME = FINISH_TIME - START_TIME
|
|
|
+ long etime = mapT.getLongValue(MapTaskKeys.FINISH_TIME) - mapT.getLongValue(MapTaskKeys.START_TIME);
|
|
|
+ mapT.setValue(MapTaskKeys.EXECUTION_TIME, (new Long(etime)).toString());
|
|
|
|
|
|
}else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) {
|
|
|
- ReduceTaskStatistics reduceT = new ReduceTaskStatistics();
|
|
|
- java.util.Map<JobHistory.Keys, String> reduceTask = task.getValues();
|
|
|
- java.util.Map<JobHistory.Keys, String> successTaskAttemptMap = getLastSuccessfulTaskAttempt(task);
|
|
|
- // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
|
|
|
- if (successTaskAttemptMap != null) {
|
|
|
- reduceTask.putAll(successTaskAttemptMap);
|
|
|
- } else {
|
|
|
- System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
|
|
|
- }
|
|
|
- int size = reduceTask.size();
|
|
|
- java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator();
|
|
|
- for (int j = 0; j < size; j++)
|
|
|
- {
|
|
|
- Map.Entry<JobHistory.Keys, String> rtc = kv.next();
|
|
|
- JobHistory.Keys key = rtc.getKey();
|
|
|
- String value = rtc.getValue();
|
|
|
- //System.out.println("JobHistory.ReduceKeys."+key+": "+value);
|
|
|
- switch (key) {
|
|
|
- case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
|
|
|
- case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
|
|
|
- case HOSTNAME: reduceT.setValue(ReduceTaskKeys.HOSTNAME, value); break;
|
|
|
- case TASK_TYPE: reduceT.setValue(ReduceTaskKeys.TASK_TYPE, value); break;
|
|
|
- case TASK_STATUS: reduceT.setValue(ReduceTaskKeys.STATUS, value); break;
|
|
|
- case START_TIME: reduceT.setValue(ReduceTaskKeys.START_TIME, value); break;
|
|
|
- case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
|
|
|
- case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
|
|
|
- case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
|
|
|
- case SPLITS: reduceT.setValue(ReduceTaskKeys.SPLITS, value); break;
|
|
|
- case TRACKER_NAME: reduceT.setValue(ReduceTaskKeys.TRACKER_NAME, value); break;
|
|
|
- case STATE_STRING: reduceT.setValue(ReduceTaskKeys.STATE_STRING, value); break;
|
|
|
- case HTTP_PORT: reduceT.setValue(ReduceTaskKeys.HTTP_PORT, value); break;
|
|
|
- case COUNTERS:
|
|
|
- value.concat(",");
|
|
|
- parseAndAddReduceTaskCounters(reduceT, value);
|
|
|
- reduceTaskList.add(reduceT);
|
|
|
- break;
|
|
|
- default: System.out.println("JobHistory.ReduceKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
|
|
|
- break;
|
|
|
+
|
|
|
+ ReduceTaskStatistics reduceT = new ReduceTaskStatistics();
|
|
|
+ java.util.Map<JobHistory.Keys, String> reduceTask = task.getValues();
|
|
|
+ java.util.Map<JobHistory.Keys, String> successTaskAttemptMap = getLastSuccessfulTaskAttempt(task);
|
|
|
+ // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
|
|
|
+ if (successTaskAttemptMap != null) {
|
|
|
+ reduceTask.putAll(successTaskAttemptMap);
|
|
|
+ } else {
|
|
|
+ System.err.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
|
|
|
+ }
|
|
|
+ int size = reduceTask.size();
|
|
|
+ java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator();
|
|
|
+ for (int j = 0; j < size; j++)
|
|
|
+ {
|
|
|
+ Map.Entry<JobHistory.Keys, String> rtc = kv.next();
|
|
|
+ JobHistory.Keys key = rtc.getKey();
|
|
|
+ String value = rtc.getValue();
|
|
|
+ //System.out.println("JobHistory.ReduceKeys."+key+": "+value);
|
|
|
+ switch (key) {
|
|
|
+ case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
|
|
|
+ case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
|
|
|
+ case HOSTNAME: reduceT.setValue(ReduceTaskKeys.HOSTNAME, value); break;
|
|
|
+ case TASK_TYPE: reduceT.setValue(ReduceTaskKeys.TASK_TYPE, value); break;
|
|
|
+ case TASK_STATUS: reduceT.setValue(ReduceTaskKeys.STATUS, value); break;
|
|
|
+ case START_TIME: reduceT.setValue(ReduceTaskKeys.START_TIME, value); break;
|
|
|
+ case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
|
|
|
+ case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
|
|
|
+ case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
|
|
|
+ case SPLITS: reduceT.setValue(ReduceTaskKeys.SPLITS, value); break;
|
|
|
+ case TRACKER_NAME: reduceT.setValue(ReduceTaskKeys.TRACKER_NAME, value); break;
|
|
|
+ case STATE_STRING: reduceT.setValue(ReduceTaskKeys.STATE_STRING, value); break;
|
|
|
+ case HTTP_PORT: reduceT.setValue(ReduceTaskKeys.HTTP_PORT, value); break;
|
|
|
+ case COUNTERS:
|
|
|
+ value.concat(",");
|
|
|
+ parseAndAddReduceTaskCounters(reduceT, value);
|
|
|
+ reduceTaskList.add(reduceT);
|
|
|
+ break;
|
|
|
+ default: System.err.println("JobHistory.ReduceKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Add number of task attempts
|
|
|
reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
|
|
|
- }
|
|
|
+
|
|
|
+ // Add EXECUTION_TIME = FINISH_TIME - START_TIME
|
|
|
+ long etime1 = reduceT.getLongValue(ReduceTaskKeys.FINISH_TIME) - reduceT.getLongValue(ReduceTaskKeys.START_TIME);
|
|
|
+ reduceT.setValue(ReduceTaskKeys.EXECUTION_TIME, (new Long(etime1)).toString());
|
|
|
+
|
|
|
} else if (task.get(Keys.TASK_TYPE).equals("CLEANUP") ||
|
|
|
task.get(Keys.TASK_TYPE).equals("SETUP")) {
|
|
|
//System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
|
} else {
|
|
|
- System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
|
+ System.err.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -302,7 +319,6 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
//System.out.println("JobHistory.JobKeys."+key+": "+value);
|
|
|
switch (key) {
|
|
|
case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
|
|
|
- //case START_TIME: job.put(JobKeys., value); break;
|
|
|
case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
|
|
|
case JOBID: job.put(JobKeys.JOBID, value); break;
|
|
|
case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
|
|
@@ -322,7 +338,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
value.concat(",");
|
|
|
parseAndAddJobCounters(job, value);
|
|
|
break;
|
|
|
- default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
|
|
|
+ default: System.err.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -339,15 +355,15 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
|
|
|
Counters.Counter counter = mycounters.next();
|
|
|
//String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
|
|
|
- //System.out.println("groupName:"+groupname+",countername: "+countername);
|
|
|
+ //System.err.println("groupName:"+groupname+",countername: "+countername);
|
|
|
String countername = grp.getDisplayName()+"."+counter.getDisplayName();
|
|
|
String value = (new Long(counter.getValue())).toString();
|
|
|
String[] parts = {countername,value};
|
|
|
- //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
|
|
|
+ //System.err.println("part0:<"+parts[0]+">,:part1 <"+parts[1]+">");
|
|
|
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
|
|
|
- job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
+ job.put(JobKeys.FILE_BYTES_READ, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
|
|
|
- job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
+ job.put(JobKeys.FILE_BYTES_WRITTEN, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
|
|
|
job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
|
|
@@ -383,7 +399,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
|
|
|
job.put(JobKeys.SHUFFLE_BYTES, parts[1]);
|
|
|
} else {
|
|
|
- System.out.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
|
|
|
+ System.err.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -406,9 +422,9 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
String[] parts = {countername,value};
|
|
|
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
|
|
|
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
|
|
|
- mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
+ mapTask.setValue(MapTaskKeys.FILE_BYTES_READ, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
|
|
|
- mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
+ mapTask.setValue(MapTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
|
|
|
mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
|
|
@@ -428,7 +444,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
} else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
|
|
|
mapTask.setValue(MapTaskKeys.SPILLED_RECORDS, parts[1]);
|
|
|
} else {
|
|
|
- System.out.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
|
|
|
+ System.err.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -451,9 +467,9 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
String[] parts = {countername,value};
|
|
|
//System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
|
|
|
if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
|
|
|
- reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_READ, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
|
|
|
- reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.FILE_BYTES_WRITTEN, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
|
|
|
reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
} else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
|
|
@@ -473,7 +489,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
|
} else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
|
|
|
reduceTask.setValue(ReduceTaskKeys.SHUFFLE_BYTES, parts[1]);
|
|
|
} else {
|
|
|
- System.out.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
|
|
|
+ System.err.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
|
|
|
}
|
|
|
}
|
|
|
}
|