|
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
|
|
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
|
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
@@ -58,6 +59,7 @@ import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
import org.apache.hadoop.service.CompositeService;
|
|
import org.apache.hadoop.service.CompositeService;
|
|
import org.apache.hadoop.util.StringInterner;
|
|
import org.apache.hadoop.util.StringInterner;
|
|
|
|
+import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -94,6 +96,12 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
|
|
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
|
|
= new ConcurrentHashMap<>();
|
|
= new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * A Map to keep track of the history of logging each task attempt.
|
|
|
|
+ */
|
|
|
|
+ private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair>
|
|
|
|
+ taskAttemptLogProgressStamps = new ConcurrentHashMap<>();
|
|
|
|
+
|
|
private Set<WrappedJvmID> launchedJVMs = Collections
|
|
private Set<WrappedJvmID> launchedJVMs = Collections
|
|
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
|
|
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
|
|
|
|
|
|
@@ -123,10 +131,12 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
- registerHeartbeatHandler(conf);
|
|
|
|
- commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
|
|
|
|
- MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
|
|
|
|
- super.serviceInit(conf);
|
|
|
|
|
|
+ registerHeartbeatHandler(conf);
|
|
|
|
+ commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
|
|
|
|
+ MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
|
|
|
|
+ // initialize the delta threshold for logging the task progress.
|
|
|
|
+ MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf);
|
|
|
|
+ super.serviceInit(conf);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -410,8 +420,10 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
taskAttemptStatus.id = yarnAttemptID;
|
|
taskAttemptStatus.id = yarnAttemptID;
|
|
// Task sends the updated progress to the TT.
|
|
// Task sends the updated progress to the TT.
|
|
taskAttemptStatus.progress = taskStatus.getProgress();
|
|
taskAttemptStatus.progress = taskStatus.getProgress();
|
|
- LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
|
|
|
|
- + taskStatus.getProgress());
|
|
|
|
|
|
+ // log the new progress
|
|
|
|
+ taskAttemptLogProgressStamps.computeIfAbsent(taskAttemptID,
|
|
|
|
+ k -> new TaskProgressLogPair(taskAttemptID))
|
|
|
|
+ .update(taskStatus.getProgress());
|
|
// Task sends the updated state-string to the TT.
|
|
// Task sends the updated state-string to the TT.
|
|
taskAttemptStatus.stateString = taskStatus.getStateString();
|
|
taskAttemptStatus.stateString = taskStatus.getStateString();
|
|
// Task sends the updated phase to the TT.
|
|
// Task sends the updated phase to the TT.
|
|
@@ -637,4 +649,68 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
|
|
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
|
|
return attemptIdToStatus;
|
|
return attemptIdToStatus;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Entity to keep track of the taskAttempt, last time it was logged,
|
|
|
|
+ * and the
|
|
|
|
+ * progress that has been logged.
|
|
|
|
+ */
|
|
|
|
+ class TaskProgressLogPair {
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The taskAttemptId of that history record.
|
|
|
|
+ */
|
|
|
|
+ private final TaskAttemptID taskAttemptID;
|
|
|
|
+ /**
|
|
|
|
+ * Timestamp of last time the progress was logged.
|
|
|
|
+ */
|
|
|
|
+ private volatile long logTimeStamp;
|
|
|
|
+ /**
|
|
|
|
+ * Snapshot of the last logged progress.
|
|
|
|
+ */
|
|
|
|
+ private volatile double prevProgress;
|
|
|
|
+
|
|
|
|
+ TaskProgressLogPair(final TaskAttemptID attemptID) {
|
|
|
|
+ taskAttemptID = attemptID;
|
|
|
|
+ prevProgress = 0.0;
|
|
|
|
+ logTimeStamp = 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void resetLog(final boolean doLog,
|
|
|
|
+ final float progress, final double processedProgress,
|
|
|
|
+ final long timestamp) {
|
|
|
|
+ if (doLog) {
|
|
|
|
+ prevProgress = processedProgress;
|
|
|
|
+ logTimeStamp = timestamp;
|
|
|
|
+ LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
|
|
|
|
+ + progress);
|
|
|
|
+ } else {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : "
|
|
|
|
+ + progress);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void update(final float progress) {
|
|
|
|
+ final double processedProgress =
|
|
|
|
+ MRJobConfUtil.convertTaskProgressToFactor(progress);
|
|
|
|
+ final double diffProgress = processedProgress - prevProgress;
|
|
|
|
+ final long currentTime = Time.monotonicNow();
|
|
|
|
+ boolean result =
|
|
|
|
+ (Double.compare(diffProgress,
|
|
|
|
+ MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0);
|
|
|
|
+ if (!result) {
|
|
|
|
+ // check if time has expired.
|
|
|
|
+ result = ((currentTime - logTimeStamp)
|
|
|
|
+ >= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold());
|
|
|
|
+ }
|
|
|
|
+ // It is helpful to log the progress when it reaches 1.0F.
|
|
|
|
+ if (Float.compare(progress, 1.0f) == 0) {
|
|
|
|
+ result = true;
|
|
|
|
+ taskAttemptLogProgressStamps.remove(taskAttemptID);
|
|
|
|
+ }
|
|
|
|
+ resetLog(result, progress, processedProgress, currentTime);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|