|
@@ -18,6 +18,8 @@ package org.apache.hadoop.mapred;
|
|
|
import org.apache.hadoop.io.*;
|
|
|
|
|
|
import java.io.*;
|
|
|
+// enumeration for reporting current phase of a task.
|
|
|
+enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE};
|
|
|
|
|
|
/**************************************************
|
|
|
* Describes the current status of a task. This is
|
|
@@ -38,12 +40,22 @@ class TaskStatus implements Writable {
|
|
|
private String diagnosticInfo;
|
|
|
private String stateString;
|
|
|
private String taskTracker;
|
|
|
+
|
|
|
+ private long startTime ;
|
|
|
+ private long finishTime ;
|
|
|
+
|
|
|
+ // only for reduce tasks
|
|
|
+ private long shuffleFinishTime ;
|
|
|
+ private long sortFinishTime ;
|
|
|
+
|
|
|
+ private Phase phase = Phase.STARTING;
|
|
|
|
|
|
public TaskStatus() {}
|
|
|
|
|
|
public TaskStatus(String taskid, boolean isMap, float progress,
|
|
|
int runState, String diagnosticInfo,
|
|
|
- String stateString, String taskTracker) {
|
|
|
+ String stateString, String taskTracker,
|
|
|
+ Phase phase) {
|
|
|
this.taskid = taskid;
|
|
|
this.isMap = isMap;
|
|
|
this.progress = progress;
|
|
@@ -51,6 +63,7 @@ class TaskStatus implements Writable {
|
|
|
this.diagnosticInfo = diagnosticInfo;
|
|
|
this.stateString = stateString;
|
|
|
this.taskTracker = taskTracker;
|
|
|
+ this.phase = phase ;
|
|
|
}
|
|
|
|
|
|
public String getTaskId() { return taskid; }
|
|
@@ -65,7 +78,104 @@ class TaskStatus implements Writable {
|
|
|
public void setDiagnosticInfo(String info) { this.diagnosticInfo = info; }
|
|
|
public String getStateString() { return stateString; }
|
|
|
public void setStateString(String stateString) { this.stateString = stateString; }
|
|
|
+ /**
|
|
|
+ * Get task finish time. if shuffleFinishTime and sortFinishTime
|
|
|
+ * are not set before, these are set to finishTime. It takes care of
|
|
|
+ * the case when shuffle, sort and finish are completed with in the
|
|
|
+ * heartbeat interval and are not reported separately. if task state is
|
|
|
+ * TaskStatus.FAILED then finish time represents when the task failed.
|
|
|
+ * @return finish time of the task.
|
|
|
+ */
|
|
|
+ public long getFinishTime() {
|
|
|
+ return finishTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets finishTime.
|
|
|
+ * @param finishTime finish time of task.
|
|
|
+ */
|
|
|
+ void setFinishTime(long finishTime) {
|
|
|
+ if( shuffleFinishTime == 0 ) {
|
|
|
+ this.shuffleFinishTime = finishTime ;
|
|
|
+ }
|
|
|
+ if( sortFinishTime == 0 ){
|
|
|
+ this.sortFinishTime = finishTime ;
|
|
|
+ }
|
|
|
+ this.finishTime = finishTime;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Get shuffle finish time for the task. If shuffle finish time was
|
|
|
+ * not set due to shuffle/sort/finish phases ending within same
|
|
|
+ * heartbeat interval, it is set to finish time of next phase i.e. sort
|
|
|
+ * or task finish when these are set.
|
|
|
+ * @return 0 if shuffleFinishTime, sortFinishTime and finish time are not set. else
|
|
|
+ * it returns approximate shuffle finish time.
|
|
|
+ */
|
|
|
+ public long getShuffleFinishTime() {
|
|
|
+ return shuffleFinishTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set shuffle finish time.
|
|
|
+ * @param shuffleFinishTime
|
|
|
+ */
|
|
|
+ void setShuffleFinishTime(long shuffleFinishTime) {
|
|
|
+ this.shuffleFinishTime = shuffleFinishTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get sort finish time for the task,. If sort finish time was not set
|
|
|
+ * due to sort and reduce phase finishing in same heartebat interval, it is
|
|
|
+ * set to finish time, when finish time is set.
|
|
|
+ * @return 0 if sort finish time and finish time are not set, else returns sort
|
|
|
+ * finish time if that is set, else it returns finish time.
|
|
|
+ */
|
|
|
+ public long getSortFinishTime() {
|
|
|
+ return sortFinishTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets sortFinishTime, if shuffleFinishTime is not set before
|
|
|
+ * then its set to sortFinishTime.
|
|
|
+ * @param sortFinishTime
|
|
|
+ */
|
|
|
+ void setSortFinishTime(long sortFinishTime) {
|
|
|
+ this.sortFinishTime = sortFinishTime;
|
|
|
+ if( 0 == this.shuffleFinishTime){
|
|
|
+ this.shuffleFinishTime = sortFinishTime ;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get start time of the task.
|
|
|
+ * @return 0 is start time is not set, else returns start time.
|
|
|
+ */
|
|
|
+ public long getStartTime() {
|
|
|
+ return startTime;
|
|
|
+ }
|
|
|
|
|
|
+ /**
|
|
|
+ * Set startTime of the task.
|
|
|
+ * @param startTime start time
|
|
|
+ */
|
|
|
+ void setStartTime(long startTime) {
|
|
|
+ this.startTime = startTime;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Get current phase of this task. Phase.Map in case of map tasks,
|
|
|
+ * for reduce one of Phase.SHUFFLE, Phase.SORT or Phase.REDUCE.
|
|
|
+ * @return .
|
|
|
+ */
|
|
|
+ public Phase getPhase(){
|
|
|
+ return this.phase;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Set current phase of this task.
|
|
|
+ * @param p
|
|
|
+ */
|
|
|
+ void setPhase(Phase p){
|
|
|
+ this.phase = p ;
|
|
|
+ }
|
|
|
//////////////////////////////////////////////
|
|
|
// Writable
|
|
|
//////////////////////////////////////////////
|
|
@@ -76,6 +186,13 @@ class TaskStatus implements Writable {
|
|
|
out.writeInt(runState);
|
|
|
UTF8.writeString(out, diagnosticInfo);
|
|
|
UTF8.writeString(out, stateString);
|
|
|
+ WritableUtils.writeEnum(out, phase);
|
|
|
+ out.writeLong(startTime);
|
|
|
+ out.writeLong(finishTime);
|
|
|
+ if(! isMap){
|
|
|
+ out.writeLong(shuffleFinishTime);
|
|
|
+ out.writeLong(sortFinishTime);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public void readFields(DataInput in) throws IOException {
|
|
@@ -85,5 +202,13 @@ class TaskStatus implements Writable {
|
|
|
this.runState = in.readInt();
|
|
|
this.diagnosticInfo = UTF8.readString(in);
|
|
|
this.stateString = UTF8.readString(in);
|
|
|
- }
|
|
|
+ this.phase = WritableUtils.readEnum(in, Phase.class);
|
|
|
+ this.startTime = in.readLong();
|
|
|
+ this.finishTime = in.readLong() ;
|
|
|
+ if( ! this.isMap ){
|
|
|
+ shuffleFinishTime = in.readLong();
|
|
|
+ sortFinishTime = in.readLong();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+
|