|
@@ -18,7 +18,21 @@
|
|
|
|
|
|
package org.apache.hadoop.mapreduce.v2.app;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
+import java.io.IOException;
|
|
|
+import java.lang.reflect.Constructor;
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -27,20 +41,37 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.http.HttpConfig;
|
|
|
-import org.apache.hadoop.mapred.*;
|
|
|
-import org.apache.hadoop.mapreduce.*;
|
|
|
+import org.apache.hadoop.mapred.FileOutputCommitter;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
+import org.apache.hadoop.mapred.LocalContainerLauncher;
|
|
|
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
|
|
|
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
|
|
|
+import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
|
import org.apache.hadoop.mapreduce.OutputFormat;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
|
-import org.apache.hadoop.mapreduce.jobhistory.*;
|
|
|
+import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryCopyService;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
|
|
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
|
|
|
import org.apache.hadoop.mapreduce.security.TokenCache;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
|
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.records.*;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
|
|
@@ -51,14 +82,26 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.event.*;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.rm.*;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
|
|
@@ -95,14 +138,7 @@ import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.lang.reflect.Constructor;
|
|
|
-import java.lang.reflect.InvocationTargetException;
|
|
|
-import java.security.PrivilegedExceptionAction;
|
|
|
-import java.util.*;
|
|
|
-import java.util.Map.Entry;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
* The Map-Reduce Application Master.
|
|
@@ -166,7 +202,8 @@ public class MRAppMaster extends CompositeService {
|
|
|
private Credentials jobCredentials = new Credentials(); // Filled during init
|
|
|
protected UserGroupInformation currentUser; // Will be setup during init
|
|
|
|
|
|
- private volatile boolean isLastAMRetry = false;
|
|
|
+ @VisibleForTesting
|
|
|
+ protected volatile boolean isLastAMRetry = false;
|
|
|
//Something happened and we should shut down right after we start up.
|
|
|
boolean errorHappenedShutDown = false;
|
|
|
private String shutDownMessage = null;
|
|
@@ -175,7 +212,7 @@ public class MRAppMaster extends CompositeService {
|
|
|
private long recoveredJobStartTime = 0;
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- protected AtomicBoolean safeToReportTerminationToUser =
|
|
|
+ protected AtomicBoolean successfullyUnregistered =
|
|
|
new AtomicBoolean(false);
|
|
|
|
|
|
public MRAppMaster(ApplicationAttemptId applicationAttemptId,
|
|
@@ -208,14 +245,14 @@ public class MRAppMaster extends CompositeService {
|
|
|
|
|
|
initJobCredentialsAndUGI(conf);
|
|
|
|
|
|
- isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
|
|
+ context = new RunningAppContext(conf);
|
|
|
+
|
|
|
+ ((RunningAppContext)context).computeIsLastAMRetry();
|
|
|
LOG.info("The specific max attempts: " + maxAppAttempts +
|
|
|
" for application: " + appAttemptID.getApplicationId().getId() +
|
|
|
". Attempt num: " + appAttemptID.getAttemptId() +
|
|
|
" is last retry: " + isLastAMRetry);
|
|
|
|
|
|
- context = new RunningAppContext(conf);
|
|
|
-
|
|
|
// Job name is the same as the app name util we support DAG of jobs
|
|
|
// for an app later
|
|
|
appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
|
|
@@ -511,11 +548,6 @@ public class MRAppMaster extends CompositeService {
|
|
|
MRAppMaster.this.stop();
|
|
|
|
|
|
if (isLastAMRetry) {
|
|
|
- // Except ClientService, other services are already stopped, it is safe to
|
|
|
- // let clients know the final states. ClientService should wait for some
|
|
|
- // time so clients have enough time to know the final states.
|
|
|
- safeToReportTerminationToUser.set(true);
|
|
|
-
|
|
|
// Send job-end notification when it is safe to report termination to
|
|
|
// users and it is the last AM retry
|
|
|
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
|
|
@@ -524,7 +556,14 @@ public class MRAppMaster extends CompositeService {
|
|
|
+ job.getReport().getJobId());
|
|
|
JobEndNotifier notifier = new JobEndNotifier();
|
|
|
notifier.setConf(getConfig());
|
|
|
- notifier.notify(job.getReport());
|
|
|
+ JobReport report = job.getReport();
|
|
|
+ // If unregistration fails, the final state is unavailable. However,
|
|
|
+ // at the last AM Retry, the client will finally be notified FAILED
|
|
|
+ // from RM, so we should let users know FAILED via notifier as well
|
|
|
+ if (!context.hasSuccessfullyUnregistered()) {
|
|
|
+ report.setJobState(JobState.FAILED);
|
|
|
+ }
|
|
|
+ notifier.notify(report);
|
|
|
} catch (InterruptedException ie) {
|
|
|
LOG.warn("Job end notification interrupted for jobID : "
|
|
|
+ job.getReport().getJobId(), ie);
|
|
@@ -863,7 +902,7 @@ public class MRAppMaster extends CompositeService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class RunningAppContext implements AppContext {
|
|
|
+ public class RunningAppContext implements AppContext {
|
|
|
|
|
|
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
|
|
|
private final Configuration conf;
|
|
@@ -942,8 +981,16 @@ public class MRAppMaster extends CompositeService {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean safeToReportTerminationToUser() {
|
|
|
- return safeToReportTerminationToUser.get();
|
|
|
+ public boolean hasSuccessfullyUnregistered() {
|
|
|
+ return successfullyUnregistered.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void markSuccessfulUnregistration() {
|
|
|
+ successfullyUnregistered.set(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void computeIsLastAMRetry() {
|
|
|
+ isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
|
|
|
}
|
|
|
}
|
|
|
|