|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.net.UnknownHostException;
|
|
|
import java.security.PrivilegedAction;
|
|
|
import java.util.List;
|
|
|
|
|
@@ -40,11 +41,11 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
|
|
|
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.SecurityInfo;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -98,6 +99,9 @@ public class ClientServiceDelegate {
|
|
|
}
|
|
|
|
|
|
private void refreshProxy() throws YarnRemoteException {
|
|
|
+ //TODO RM NPEs for unknown jobs. History may still be aware.
|
|
|
+ // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
|
|
|
+ // and redirect to the history server.
|
|
|
ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
|
|
|
while (!ApplicationState.COMPLETED.equals(appMaster.getState()) &&
|
|
|
!ApplicationState.FAILED.equals(appMaster.getState()) &&
|
|
@@ -160,8 +164,12 @@ public class ClientServiceDelegate {
|
|
|
JHConfig.DEFAULT_HS_BIND_ADDRESS);
|
|
|
LOG.info("Application state is completed. " +
|
|
|
"Redirecting to job history server " + serviceAddr);
|
|
|
- //TODO:
|
|
|
- serviceHttpAddr = "";
|
|
|
+ try {
|
|
|
+ serviceHttpAddr = JobHistoryUtils.getHistoryUrl(conf, currentAppId);
|
|
|
+ } catch (UnknownHostException e) {
|
|
|
+ LOG.warn("Unable to get history url", e);
|
|
|
+ serviceHttpAddr = "UNKNOWN";
|
|
|
+ }
|
|
|
try {
|
|
|
instantiateHistoryProxy(serviceAddr);
|
|
|
return;
|
|
@@ -222,7 +230,12 @@ public class ClientServiceDelegate {
|
|
|
try {
|
|
|
GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
|
|
|
request.setJobId(jobID);
|
|
|
- return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(request).getCounters());
|
|
|
+ MRClientProtocol protocol = getRefreshedProxy(arg0);
|
|
|
+ if (protocol == null) {
|
|
|
+ /* no History to connect to, fake counters */
|
|
|
+ return new org.apache.hadoop.mapreduce.Counters();
|
|
|
+ }
|
|
|
+ return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
|
|
|
} catch(YarnRemoteException yre) {
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
|
throw yre;
|
|
@@ -231,8 +244,7 @@ public class ClientServiceDelegate {
|
|
|
}
|
|
|
|
|
|
public String getJobHistoryDir() throws IOException, InterruptedException {
|
|
|
- //TODO fix this
|
|
|
- return "";
|
|
|
+ return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
|
|
|
}
|
|
|
|
|
|
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
|
|
@@ -266,7 +278,7 @@ public class ClientServiceDelegate {
|
|
|
if (protocol == null) {
|
|
|
return new TaskCompletionEvent[0];
|
|
|
}
|
|
|
- list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
|
|
|
+ list = protocol.getTaskAttemptCompletionEvents(request).getCompletionEventList();
|
|
|
} catch(YarnRemoteException yre) {
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
|
throw yre;
|
|
@@ -284,8 +296,13 @@ public class ClientServiceDelegate {
|
|
|
List<String> list = null;
|
|
|
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(arg0);
|
|
|
GetDiagnosticsRequest request = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
|
|
|
+ MRClientProtocol protocol;
|
|
|
try {
|
|
|
request.setTaskAttemptId(attemptID);
|
|
|
+ protocol = getProxy(arg0.getJobID());
|
|
|
+ if (protocol == null) {
|
|
|
+ return new String[0];
|
|
|
+ }
|
|
|
list = getProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
|
|
|
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
@@ -293,8 +310,11 @@ public class ClientServiceDelegate {
|
|
|
} catch(Exception e) {
|
|
|
LOG.debug("Failed to contact application master ", e);
|
|
|
try {
|
|
|
- request.setTaskAttemptId(attemptID);
|
|
|
- list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
|
|
|
+ protocol = getRefreshedProxy(arg0.getJobID());
|
|
|
+ if (protocol == null) {
|
|
|
+ return new String[0];
|
|
|
+ }
|
|
|
+ list = protocol.getDiagnostics(request).getDiagnosticsList();
|
|
|
} catch(YarnRemoteException yre) {
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
|
throw yre;
|
|
@@ -361,7 +381,7 @@ public class ClientServiceDelegate {
|
|
|
if (protocol == null) {
|
|
|
return createFakeJobReport(currentAppState, jobId, jobFile);
|
|
|
}
|
|
|
- report = getRefreshedProxy(oldJobID).getJobReport(request).getJobReport();
|
|
|
+ report = protocol.getJobReport(request).getJobReport();
|
|
|
} catch(YarnRemoteException yre) {
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
|
throw yre;
|
|
@@ -375,9 +395,14 @@ public class ClientServiceDelegate {
|
|
|
List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = null;
|
|
|
org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
|
|
|
GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
|
|
|
+ MRClientProtocol protocol = null;
|
|
|
try {
|
|
|
request.setJobId(nJobID);
|
|
|
request.setTaskType(TypeConverter.toYarn(taskType));
|
|
|
+ protocol = getProxy(jobID);
|
|
|
+ if (protocol == null) {
|
|
|
+ return new org.apache.hadoop.mapreduce.TaskReport[0];
|
|
|
+ }
|
|
|
taskReports = getProxy(jobID).getTaskReports(request).getTaskReportList();
|
|
|
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
@@ -387,7 +412,11 @@ public class ClientServiceDelegate {
|
|
|
try {
|
|
|
request.setJobId(nJobID);
|
|
|
request.setTaskType(TypeConverter.toYarn(taskType));
|
|
|
- taskReports = getRefreshedProxy(jobID).getTaskReports(request).getTaskReportList();
|
|
|
+ protocol = getRefreshedProxy(jobID);
|
|
|
+ if (protocol == null) {
|
|
|
+ return new org.apache.hadoop.mapreduce.TaskReport[0];
|
|
|
+ }
|
|
|
+ taskReports = protocol.getTaskReports(request).getTaskReportList();
|
|
|
} catch(YarnRemoteException yre) {
|
|
|
LOG.warn(RPCUtil.toString(yre));
|
|
|
throw yre;
|
|
@@ -403,6 +432,10 @@ public class ClientServiceDelegate {
|
|
|
= TypeConverter.toYarn(taskAttemptID);
|
|
|
KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
|
|
|
FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
|
|
|
+ MRClientProtocol protocol = getProxy(taskAttemptID.getJobID());
|
|
|
+ if (protocol == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
try {
|
|
|
if (fail) {
|
|
|
failRequest.setTaskAttemptId(attemptID);
|