|
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Method;
|
|
|
import java.security.PrivilegedAction;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
|
|
|
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.SecurityInfo;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -61,24 +62,20 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationState;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
|
|
|
-import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
|
|
|
|
|
|
class ClientServiceDelegate {
|
|
|
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
|
|
|
- private static final NotRunningJob NOTSTARTEDJOB =
|
|
|
- new NotRunningJob(JobState.NEW);
|
|
|
-
|
|
|
- private static final NotRunningJob FAILEDJOB =
|
|
|
- new NotRunningJob(JobState.FAILED);
|
|
|
-
|
|
|
- private static final NotRunningJob KILLEDJOB =
|
|
|
- new NotRunningJob(JobState.KILLED);
|
|
|
+
|
|
|
+ // Caches for per-user NotRunningJobs
|
|
|
+ private static HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs =
|
|
|
+ new HashMap<JobState, HashMap<String, NotRunningJob>>();
|
|
|
|
|
|
private final Configuration conf;
|
|
|
private final JobID jobId;
|
|
@@ -101,6 +98,24 @@ class ClientServiceDelegate {
|
|
|
this.appId = TypeConverter.toYarn(jobId).getAppId();
|
|
|
}
|
|
|
|
|
|
+ // Get the instance of the NotRunningJob corresponding to the specified
|
|
|
+ // user and state
|
|
|
+ private NotRunningJob getNotRunningJob(String user, JobState state) {
|
|
|
+ synchronized (notRunningJobs) {
|
|
|
+ HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
|
|
|
+ if (map == null) {
|
|
|
+ map = new HashMap<String, NotRunningJob>();
|
|
|
+ notRunningJobs.put(state, map);
|
|
|
+ }
|
|
|
+ NotRunningJob notRunningJob = map.get(user);
|
|
|
+ if (notRunningJob == null) {
|
|
|
+ notRunningJob = new NotRunningJob(user, state);
|
|
|
+ map.put(user, notRunningJob);
|
|
|
+ }
|
|
|
+ return notRunningJob;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private MRClientProtocol getProxy() throws YarnRemoteException {
|
|
|
if (!forceRefresh && realProxy != null) {
|
|
|
return realProxy;
|
|
@@ -149,26 +164,30 @@ class ClientServiceDelegate {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** we just want to return if its allocating, so that we dont
|
|
|
+ /** we just want to return if its allocating, so that we don't
|
|
|
* block on it. This is to be able to return job status
|
|
|
- * on a allocating Application.
|
|
|
+ * on an allocating Application.
|
|
|
*/
|
|
|
|
|
|
+ String user = application.getUser();
|
|
|
+ if (user == null) {
|
|
|
+ throw new YarnRemoteExceptionPBImpl("User is not set in the application report");
|
|
|
+ }
|
|
|
if (application.getState() == ApplicationState.NEW ||
|
|
|
application.getState() == ApplicationState.SUBMITTED) {
|
|
|
realProxy = null;
|
|
|
- return NOTSTARTEDJOB;
|
|
|
+ return getNotRunningJob(user, JobState.NEW);
|
|
|
}
|
|
|
|
|
|
if (application.getState() == ApplicationState.FAILED) {
|
|
|
realProxy = null;
|
|
|
- return FAILEDJOB;
|
|
|
+ return getNotRunningJob(user, JobState.FAILED);
|
|
|
}
|
|
|
|
|
|
if (application.getState() == ApplicationState.KILLED) {
|
|
|
- realProxy = null;
|
|
|
- return KILLEDJOB;
|
|
|
- }
|
|
|
+ realProxy = null;
|
|
|
+ return getNotRunningJob(user, JobState.KILLED);
|
|
|
+ }
|
|
|
|
|
|
//History server can serve a job only if application
|
|
|
//succeeded.
|
|
@@ -270,17 +289,15 @@ class ClientServiceDelegate {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
|
|
|
- YarnRemoteException {
|
|
|
+ JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
|
|
|
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
|
|
|
TypeConverter.toYarn(oldJobID);
|
|
|
- String stagingDir = conf.get("yarn.apps.stagingDir");
|
|
|
- String jobFile = stagingDir + "/" + jobId.toString();
|
|
|
- MRClientProtocol protocol;
|
|
|
GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
|
|
|
request.setJobId(jobId);
|
|
|
- JobReport report = ((GetJobReportResponse) invoke("getJobReport",
|
|
|
+ JobReport report = ((GetJobReportResponse) invoke("getJobReport",
|
|
|
GetJobReportRequest.class, request)).getJobReport();
|
|
|
+ String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
|
|
|
+
|
|
|
//TODO: add tracking url in JobReport
|
|
|
return TypeConverter.fromYarn(report, jobFile, "");
|
|
|
}
|