|
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.SortedRanges.Range;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
|
|
@@ -56,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
|
import org.apache.hadoop.util.StringInterner;
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
@@ -89,6 +91,11 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
private ConcurrentMap<TaskAttemptId,
|
|
|
AtomicReference<TaskAttemptStatus>> attemptIdToStatus
|
|
|
= new ConcurrentHashMap<>();
|
|
|
+ /**
|
|
|
+ * A Map to keep track of the history of logging each task attempt.
|
|
|
+ */
|
|
|
+ private ConcurrentHashMap<TaskAttemptID, TaskProgressLogPair>
|
|
|
+ taskAttemptLogProgressStamps = new ConcurrentHashMap<>();
|
|
|
|
|
|
private Set<WrappedJvmID> launchedJVMs = Collections
|
|
|
.newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>());
|
|
@@ -110,10 +117,12 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
|
|
|
@Override
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
|
- registerHeartbeatHandler(conf);
|
|
|
- commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
|
|
|
- MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
|
|
|
- super.serviceInit(conf);
|
|
|
+ registerHeartbeatHandler(conf);
|
|
|
+ commitWindowMs = conf.getLong(MRJobConfig.MR_AM_COMMIT_WINDOW_MS,
|
|
|
+ MRJobConfig.DEFAULT_MR_AM_COMMIT_WINDOW_MS);
|
|
|
+ // initialize the delta threshold for logging the task progress.
|
|
|
+ MRJobConfUtil.setTaskLogProgressDeltaThresholds(conf);
|
|
|
+ super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -164,6 +173,9 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
stopRpcServer();
|
|
|
+ if (taskAttemptLogProgressStamps != null) {
|
|
|
+ taskAttemptLogProgressStamps.clear();
|
|
|
+ }
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|
|
@@ -359,8 +371,15 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
taskAttemptStatus.id = yarnAttemptID;
|
|
|
// Task sends the updated progress to the TT.
|
|
|
taskAttemptStatus.progress = taskStatus.getProgress();
|
|
|
- LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
|
|
|
- + taskStatus.getProgress());
|
|
|
+ // log the new progress
|
|
|
+ TaskProgressLogPair logPair =
|
|
|
+ taskAttemptLogProgressStamps.get(taskAttemptID);
|
|
|
+ if (logPair == null) {
|
|
|
+ taskAttemptLogProgressStamps.putIfAbsent(taskAttemptID,
|
|
|
+ new TaskProgressLogPair(taskAttemptID));
|
|
|
+ logPair = taskAttemptLogProgressStamps.get(taskAttemptID);
|
|
|
+ }
|
|
|
+ logPair.update(taskStatus.getProgress());
|
|
|
// Task sends the updated state-string to the TT.
|
|
|
taskAttemptStatus.stateString = taskStatus.getStateString();
|
|
|
// Task sends the updated phase to the TT.
|
|
@@ -574,4 +593,67 @@ public class TaskAttemptListenerImpl extends CompositeService
|
|
|
AtomicReference<TaskAttemptStatus>> getAttemptIdToStatus() {
|
|
|
return attemptIdToStatus;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Entity to keep track of the taskAttempt, last time it was logged,
|
|
|
+ * and the progress that has been logged.
|
|
|
+ */
|
|
|
+ class TaskProgressLogPair {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The taskAttemptId of that history record.
|
|
|
+ */
|
|
|
+ private final TaskAttemptID taskAttemptID;
|
|
|
+ /**
|
|
|
+ * Timestamp of last time the progress was logged.
|
|
|
+ */
|
|
|
+ private volatile long logTimeStamp;
|
|
|
+ /**
|
|
|
+ * Snapshot of the last logged progress.
|
|
|
+ */
|
|
|
+ private volatile double prevProgress;
|
|
|
+
|
|
|
+ TaskProgressLogPair(final TaskAttemptID attemptID) {
|
|
|
+ taskAttemptID = attemptID;
|
|
|
+ prevProgress = 0.0;
|
|
|
+ logTimeStamp = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void resetLog(final boolean doLog,
|
|
|
+ final float progress, final double processedProgress,
|
|
|
+ final long timestamp) {
|
|
|
+ if (doLog) {
|
|
|
+ prevProgress = processedProgress;
|
|
|
+ logTimeStamp = timestamp;
|
|
|
+ LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
|
|
|
+ + progress);
|
|
|
+ } else {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Progress of TaskAttempt " + taskAttemptID + " is : "
|
|
|
+ + progress);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void update(final float progress) {
|
|
|
+ final double processedProgress =
|
|
|
+ MRJobConfUtil.convertTaskProgressToFactor(progress);
|
|
|
+ final double diffProgress = processedProgress - prevProgress;
|
|
|
+ final long currentTime = Time.monotonicNow();
|
|
|
+ boolean result =
|
|
|
+ (Double.compare(diffProgress,
|
|
|
+ MRJobConfUtil.getTaskProgressMinDeltaThreshold()) >= 0);
|
|
|
+ if (!result) {
|
|
|
+ // check if time has expired.
|
|
|
+ result = ((currentTime - logTimeStamp)
|
|
|
+ >= MRJobConfUtil.getTaskProgressWaitDeltaTimeThreshold());
|
|
|
+ }
|
|
|
+ // It is helpful to log the progress when it reaches 1.0F.
|
|
|
+ if (Float.compare(progress, 1.0f) == 0) {
|
|
|
+ result = true;
|
|
|
+ taskAttemptLogProgressStamps.remove(taskAttemptID);
|
|
|
+ }
|
|
|
+ resetLog(result, progress, processedProgress, currentTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|