|
@@ -83,21 +83,34 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
* Get Job Counters of type long
|
|
* Get Job Counters of type long
|
|
*/
|
|
*/
|
|
public long getLongValue(Enum key) {
|
|
public long getLongValue(Enum key) {
|
|
- return Long.parseLong(this._job.get(key));
|
|
|
|
|
|
+ if (this._job.get(key) == null) {
|
|
|
|
+ return (long)0;
|
|
|
|
+ }
|
|
|
|
+ else {
|
|
|
|
+ return Long.parseLong(this._job.get(key));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
* Get job Counters of type Double
|
|
* Get job Counters of type Double
|
|
*/
|
|
*/
|
|
public double getDoubleValue(Enum key) {
|
|
public double getDoubleValue(Enum key) {
|
|
- return Double.parseDouble(this._job.get(key));
|
|
|
|
|
|
+ if (this._job.get(key) == null) {
|
|
|
|
+ return (double)0;
|
|
|
|
+ } else {
|
|
|
|
+ return Double.parseDouble(this._job.get(key));
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
* Get Job Counters of type String
|
|
* Get Job Counters of type String
|
|
*/
|
|
*/
|
|
public String getStringValue(Enum key) {
|
|
public String getStringValue(Enum key) {
|
|
- return this._job.get(key);
|
|
|
|
|
|
+ if (this._job.get(key) == null) {
|
|
|
|
+ return "";
|
|
|
|
+ } else {
|
|
|
|
+ return this._job.get(key);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -139,7 +152,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
this._jobConf = jobConf;
|
|
this._jobConf = jobConf;
|
|
this._jobInfo = jobInfo;
|
|
this._jobInfo = jobInfo;
|
|
this._job = new Hashtable<Enum, String>();
|
|
this._job = new Hashtable<Enum, String>();
|
|
- populate_Job(this._job, this._jobInfo.getValues());
|
|
|
|
|
|
+ populate_Job(this._job, this._jobInfo.getValues());
|
|
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
|
|
populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -175,6 +188,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
Map.Entry<JobHistory.Keys, String> mtc = kv.next();
|
|
Map.Entry<JobHistory.Keys, String> mtc = kv.next();
|
|
JobHistory.Keys key = mtc.getKey();
|
|
JobHistory.Keys key = mtc.getKey();
|
|
String value = mtc.getValue();
|
|
String value = mtc.getValue();
|
|
|
|
+ //System.out.println("JobHistory.MapKeys."+key+": "+value);
|
|
switch (key) {
|
|
switch (key) {
|
|
case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break;
|
|
case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break;
|
|
case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break;
|
|
case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break;
|
|
@@ -184,12 +198,16 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break;
|
|
case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break;
|
|
case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break;
|
|
case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break;
|
|
case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break;
|
|
case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break;
|
|
|
|
+ case TRACKER_NAME: mapT.setValue(MapTaskKeys.TRACKER_NAME, value); break;
|
|
|
|
+ case STATE_STRING: mapT.setValue(MapTaskKeys.STATE_STRING, value); break;
|
|
|
|
+ case HTTP_PORT: mapT.setValue(MapTaskKeys.HTTP_PORT, value); break;
|
|
|
|
+ case ERROR: mapT.setValue(MapTaskKeys.ERROR, value); break;
|
|
case COUNTERS:
|
|
case COUNTERS:
|
|
value.concat(",");
|
|
value.concat(",");
|
|
parseAndAddMapTaskCounters(mapT, value);
|
|
parseAndAddMapTaskCounters(mapT, value);
|
|
mapTaskList.add(mapT);
|
|
mapTaskList.add(mapT);
|
|
break;
|
|
break;
|
|
- default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
|
|
|
|
|
|
+ default: System.out.println("JobHistory.MapKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -214,6 +232,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
Map.Entry<JobHistory.Keys, String> rtc = kv.next();
|
|
Map.Entry<JobHistory.Keys, String> rtc = kv.next();
|
|
JobHistory.Keys key = rtc.getKey();
|
|
JobHistory.Keys key = rtc.getKey();
|
|
String value = rtc.getValue();
|
|
String value = rtc.getValue();
|
|
|
|
+ //System.out.println("JobHistory.ReduceKeys."+key+": "+value);
|
|
switch (key) {
|
|
switch (key) {
|
|
case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
|
|
case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
|
|
case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
|
|
case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
|
|
@@ -224,19 +243,24 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
|
|
case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
|
|
case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
|
|
case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
|
|
case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
|
|
case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
|
|
|
|
+ case SPLITS: reduceT.setValue(ReduceTaskKeys.SPLITS, value); break;
|
|
|
|
+ case TRACKER_NAME: reduceT.setValue(ReduceTaskKeys.TRACKER_NAME, value); break;
|
|
|
|
+ case STATE_STRING: reduceT.setValue(ReduceTaskKeys.STATE_STRING, value); break;
|
|
|
|
+ case HTTP_PORT: reduceT.setValue(ReduceTaskKeys.HTTP_PORT, value); break;
|
|
case COUNTERS:
|
|
case COUNTERS:
|
|
value.concat(",");
|
|
value.concat(",");
|
|
parseAndAddReduceTaskCounters(reduceT, value);
|
|
parseAndAddReduceTaskCounters(reduceT, value);
|
|
reduceTaskList.add(reduceT);
|
|
reduceTaskList.add(reduceT);
|
|
break;
|
|
break;
|
|
- default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
|
|
|
|
|
|
+ default: System.out.println("JobHistory.ReduceKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
// Add number of task attempts
|
|
// Add number of task attempts
|
|
reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
|
|
reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
|
|
}
|
|
}
|
|
- } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) {
|
|
|
|
|
|
+ } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP") ||
|
|
|
|
+ task.get(Keys.TASK_TYPE).equals("SETUP")) {
|
|
//System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
//System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
} else {
|
|
} else {
|
|
System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
|
|
@@ -275,6 +299,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
|
|
Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
|
|
JobHistory.Keys key = entry.getKey();
|
|
JobHistory.Keys key = entry.getKey();
|
|
String value = entry.getValue();
|
|
String value = entry.getValue();
|
|
|
|
+ //System.out.println("JobHistory.JobKeys."+key+": "+value);
|
|
switch (key) {
|
|
switch (key) {
|
|
case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
|
|
case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
|
|
//case START_TIME: job.put(JobKeys., value); break;
|
|
//case START_TIME: job.put(JobKeys., value); break;
|
|
@@ -292,6 +317,7 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
|
|
case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
|
|
case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
|
|
case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
|
|
case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
|
|
case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
|
|
|
|
+ case JOB_PRIORITY: job.put(JobKeys.JOB_PRIORITY, value); break;
|
|
case COUNTERS:
|
|
case COUNTERS:
|
|
value.concat(",");
|
|
value.concat(",");
|
|
parseAndAddJobCounters(job, value);
|
|
parseAndAddJobCounters(job, value);
|
|
@@ -306,47 +332,59 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
* Parse and add the job counters
|
|
* Parse and add the job counters
|
|
*/
|
|
*/
|
|
private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
|
|
private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
|
|
- Matcher m = _pattern.matcher(counters);
|
|
|
|
- while(m.find()){
|
|
|
|
- String ctuple = m.group(0);
|
|
|
|
- //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
|
|
|
|
- String []parts = ctuple.split(":");
|
|
|
|
- if (parts[0].equals("File Systems.Local bytes read")) {
|
|
|
|
- job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.Local bytes written")) {
|
|
|
|
- job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.HDFS bytes read")) {
|
|
|
|
- job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.HDFS bytes written")) {
|
|
|
|
- job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Job Counters .Launched map tasks")) {
|
|
|
|
- job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
|
|
|
|
- job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Job Counters .Data-local map tasks")) {
|
|
|
|
- job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
|
|
|
|
- job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
|
|
|
|
- job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
|
|
|
|
- job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
|
|
|
|
- job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
|
|
|
|
- job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
|
|
|
|
- job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
|
|
|
|
- job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
|
|
|
|
- job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
|
|
|
|
- job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
|
|
|
|
- job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else {
|
|
|
|
- System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
|
|
|
|
|
|
+ Counters cnt = Counters.fromEscapedCompactString(counters);
|
|
|
|
+ for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
|
|
|
|
+ Counters.Group grp = grps.next();
|
|
|
|
+ //String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
|
|
|
|
+ for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
|
|
|
|
+ Counters.Counter counter = mycounters.next();
|
|
|
|
+ //String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
|
|
|
|
+ //System.out.println("groupName:"+groupname+",countername: "+countername);
|
|
|
|
+ String countername = grp.getDisplayName()+"."+counter.getDisplayName();
|
|
|
|
+ String value = (new Long(counter.getValue())).toString();
|
|
|
|
+ String[] parts = {countername,value};
|
|
|
|
+ //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
|
|
|
|
+ if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
|
|
|
|
+ job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
|
|
|
|
+ job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
|
|
|
|
+ job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
|
|
|
|
+ job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Job Counters .Launched map tasks")) {
|
|
|
|
+ job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
|
|
|
|
+ job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Job Counters .Data-local map tasks")) {
|
|
|
|
+ job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
|
|
|
|
+ job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
|
|
|
|
+ job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
|
|
|
|
+ job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
|
|
|
|
+ job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
|
|
|
|
+ job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
|
|
|
|
+ job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
|
|
|
|
+ job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
|
|
|
|
+ job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
|
|
|
|
+ job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
|
|
|
|
+ job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
|
|
|
|
+ job.put(JobKeys.SPILLED_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
|
|
|
|
+ job.put(JobKeys.SHUFFLE_BYTES, parts[1]);
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -354,67 +392,89 @@ public class JobStatistics implements JobStatisticsInterface {
|
|
/*
|
|
/*
|
|
* Parse and add the Map task counters
|
|
* Parse and add the Map task counters
|
|
*/
|
|
*/
|
|
- private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) {
|
|
|
|
- Matcher m = _pattern.matcher(counters);
|
|
|
|
- while(m.find()){
|
|
|
|
- String ctuple = m.group(0);
|
|
|
|
- //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
|
|
|
|
- String []parts = ctuple.split(":");
|
|
|
|
- if (parts[0].equals("File Systems.Local bytes read")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.Local bytes written")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.HDFS bytes read")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.HDFS bytes written")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
|
|
|
|
- mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else {
|
|
|
|
- System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) throws ParseException {
|
|
|
|
+ Counters cnt = Counters.fromEscapedCompactString(counters);
|
|
|
|
+ for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
|
|
|
|
+ Counters.Group grp = grps.next();
|
|
|
|
+ //String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
|
|
|
|
+ for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
|
|
|
|
+ Counters.Counter counter = mycounters.next();
|
|
|
|
+ //String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
|
|
|
|
+ //System.out.println("groupName:"+groupname+",countername: "+countername);
|
|
|
|
+ String countername = grp.getDisplayName()+"."+counter.getDisplayName();
|
|
|
|
+ String value = (new Long(counter.getValue())).toString();
|
|
|
|
+ String[] parts = {countername,value};
|
|
|
|
+ //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
|
|
|
|
+ if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
|
|
|
|
+ mapTask.setValue(MapTaskKeys.SPILLED_RECORDS, parts[1]);
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
* Parse and add the reduce task counters
|
|
* Parse and add the reduce task counters
|
|
*/
|
|
*/
|
|
- private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) {
|
|
|
|
- Matcher m = _pattern.matcher(counters);
|
|
|
|
- while(m.find()){
|
|
|
|
- String ctuple = m.group(0);
|
|
|
|
- //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
|
|
|
|
- String []parts = ctuple.split(":");
|
|
|
|
- if (parts[0].equals("File Systems.Local bytes read")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.Local bytes written")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.HDFS bytes read")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
|
- } else if (parts[0].equals("File Systems.HDFS bytes written")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
- } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
|
|
|
|
- reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
|
|
|
|
- } else {
|
|
|
|
- System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
|
|
|
|
|
|
+ private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) throws ParseException {
|
|
|
|
+ Counters cnt = Counters.fromEscapedCompactString(counters);
|
|
|
|
+ for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
|
|
|
|
+ Counters.Group grp = grps.next();
|
|
|
|
+ //String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
|
|
|
|
+ for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
|
|
|
|
+ Counters.Counter counter = mycounters.next();
|
|
|
|
+ //String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
|
|
|
|
+ //System.out.println("groupName:"+groupname+",countername: "+countername);
|
|
|
|
+ String countername = grp.getDisplayName()+"."+counter.getDisplayName();
|
|
|
|
+ String value = (new Long(counter.getValue())).toString();
|
|
|
|
+ String[] parts = {countername,value};
|
|
|
|
+ //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
|
|
|
|
+ if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.SPILLED_RECORDS, parts[1]);
|
|
|
|
+ } else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
|
|
|
|
+ reduceTask.setValue(ReduceTaskKeys.SHUFFLE_BYTES, parts[1]);
|
|
|
|
+ } else {
|
|
|
|
+ System.out.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|