|
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
@@ -122,8 +123,7 @@ public class TestClientServiceDelegate {
|
|
|
|
|
|
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
|
|
|
- new RuntimeException("1")).thenThrow(new RuntimeException("2"))
|
|
|
- .thenThrow(new RuntimeException("3"))
|
|
|
+ new RuntimeException("1")).thenThrow(new RuntimeException("2"))
|
|
|
.thenReturn(getJobReportResponse());
|
|
|
|
|
|
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
|
|
@@ -135,7 +135,7 @@ public class TestClientServiceDelegate {
|
|
|
|
|
|
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
Assert.assertNotNull(jobStatus);
|
|
|
- verify(historyServerProxy, times(4)).getJobReport(
|
|
|
+ verify(historyServerProxy, times(3)).getJobReport(
|
|
|
any(GetJobReportRequest.class));
|
|
|
}
|
|
|
|
|
@@ -312,6 +312,74 @@ public class TestClientServiceDelegate {
|
|
|
any(String.class));
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ testRMDownForJobStatusBeforeGetAMReport(conf,
|
|
|
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
|
|
|
+ throws IOException {
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
|
|
|
+ testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
|
|
|
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
|
|
|
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testRMDownRestoreForJobStatusBeforeGetAMReport()
|
|
|
+ throws IOException {
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
|
+ conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
|
|
|
+
|
|
|
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
|
|
|
+ !isAMReachableFromClient);
|
|
|
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
+ when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
|
|
|
+ .thenReturn(getJobReportResponse());
|
|
|
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced1"))).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
|
|
|
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
+ conf, rmDelegate, oldJobId, historyServerProxy);
|
|
|
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
+ verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
|
|
|
+ Assert.assertNotNull(jobStatus);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
|
|
|
+ int noOfRetries) throws YarnRemoteException {
|
|
|
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
|
|
+ conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
|
|
|
+ !isAMReachableFromClient);
|
|
|
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced1"))).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced2"))).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced3")));
|
|
|
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
+ conf, rmDelegate, oldJobId, historyServerProxy);
|
|
|
+ try {
|
|
|
+ clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
+ Assert.fail("It should throw exception after retries");
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.out.println("fail to get job status,and e=" + e.toString());
|
|
|
+ }
|
|
|
+ verify(rmDelegate, times(noOfRetries)).getApplicationReport(
|
|
|
+ any(ApplicationId.class));
|
|
|
+ }
|
|
|
+
|
|
|
private GetJobReportRequest getJobReportRequest() {
|
|
|
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
|
|
|
request.setJobId(jobId);
|