|
@@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
|
-import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.Test;
|
|
@@ -103,7 +102,7 @@ public class TestClientServiceDelegate {
|
|
|
|
|
|
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
|
|
|
- RPCUtil.getRemoteException("Job ID doesnot Exist"));
|
|
|
+ new IOException("Job ID doesnot Exist"));
|
|
|
|
|
|
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
|
|
|
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
|
|
@@ -199,8 +198,7 @@ public class TestClientServiceDelegate {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testReconnectOnAMRestart() throws IOException,
|
|
|
- YarnRemoteException {
|
|
|
+ public void testReconnectOnAMRestart() throws IOException {
|
|
|
//test not applicable when AM not reachable
|
|
|
//as instantiateAMProxy is not called at all
|
|
|
if(!isAMReachableFromClient) {
|
|
@@ -212,11 +210,15 @@ public class TestClientServiceDelegate {
|
|
|
// RM returns AM1 url, null, null and AM2 url on invocations.
|
|
|
// Nulls simulate the time when AM2 is in the process of restarting.
|
|
|
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
- when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
|
|
|
- getRunningApplicationReport("am1", 78)).thenReturn(
|
|
|
- getRunningApplicationReport(null, 0)).thenReturn(
|
|
|
- getRunningApplicationReport(null, 0)).thenReturn(
|
|
|
- getRunningApplicationReport("am2", 90));
|
|
|
+ try {
|
|
|
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
|
|
|
+ getRunningApplicationReport("am1", 78)).thenReturn(
|
|
|
+ getRunningApplicationReport(null, 0)).thenReturn(
|
|
|
+ getRunningApplicationReport(null, 0)).thenReturn(
|
|
|
+ getRunningApplicationReport("am2", 90));
|
|
|
+ } catch (YarnRemoteException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
|
|
|
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
|
|
|
when(jobReportResponse1.getJobReport()).thenReturn(
|
|
@@ -267,7 +269,7 @@ public class TestClientServiceDelegate {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testAMAccessDisabled() throws IOException, YarnRemoteException {
|
|
|
+ public void testAMAccessDisabled() throws IOException {
|
|
|
//test only applicable when AM not reachable
|
|
|
if(isAMReachableFromClient) {
|
|
|
return;
|
|
@@ -278,11 +280,15 @@ public class TestClientServiceDelegate {
|
|
|
getJobReportResponseFromHistoryServer());
|
|
|
|
|
|
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
- when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
|
|
|
- getRunningApplicationReport("am1", 78)).thenReturn(
|
|
|
+ try {
|
|
|
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
|
|
|
getRunningApplicationReport("am1", 78)).thenReturn(
|
|
|
getRunningApplicationReport("am1", 78)).thenReturn(
|
|
|
- getFinishedApplicationReport());
|
|
|
+ getRunningApplicationReport("am1", 78)).thenReturn(
|
|
|
+ getFinishedApplicationReport());
|
|
|
+ } catch (YarnRemoteException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
|
|
|
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
|
|
|
historyServerProxy, rmDelegate));
|
|
@@ -319,8 +325,7 @@ public class TestClientServiceDelegate {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testRMDownForJobStatusBeforeGetAMReport() throws IOException,
|
|
|
- YarnRemoteException {
|
|
|
+ public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
|
|
|
Configuration conf = new YarnConfiguration();
|
|
|
testRMDownForJobStatusBeforeGetAMReport(conf,
|
|
|
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
|
|
@@ -328,7 +333,7 @@ public class TestClientServiceDelegate {
|
|
|
|
|
|
@Test
|
|
|
public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
|
|
|
- throws IOException, YarnRemoteException {
|
|
|
+ throws IOException {
|
|
|
Configuration conf = new YarnConfiguration();
|
|
|
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
|
|
|
testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
|
|
@@ -338,7 +343,7 @@ public class TestClientServiceDelegate {
|
|
|
|
|
|
@Test
|
|
|
public void testRMDownRestoreForJobStatusBeforeGetAMReport()
|
|
|
- throws IOException, YarnRemoteException {
|
|
|
+ throws IOException {
|
|
|
Configuration conf = new YarnConfiguration();
|
|
|
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
|
|
|
|
|
@@ -349,42 +354,52 @@ public class TestClientServiceDelegate {
|
|
|
when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
|
|
|
.thenReturn(getJobReportResponse());
|
|
|
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
- when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
|
|
|
- new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
- "Connection refuced1"))).thenThrow(
|
|
|
- new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
- "Connection refuced2"))).thenReturn(getFinishedApplicationReport());
|
|
|
- ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
- conf, rmDelegate, oldJobId, historyServerProxy);
|
|
|
- JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
- verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
|
|
|
- Assert.assertNotNull(jobStatus);
|
|
|
+ try {
|
|
|
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced1"))).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced2")))
|
|
|
+ .thenReturn(getFinishedApplicationReport());
|
|
|
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
+ conf, rmDelegate, oldJobId, historyServerProxy);
|
|
|
+ JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
+ verify(rmDelegate, times(3)).getApplicationReport(
|
|
|
+ any(ApplicationId.class));
|
|
|
+ Assert.assertNotNull(jobStatus);
|
|
|
+ } catch (YarnRemoteException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
|
|
|
- int noOfRetries) throws YarnRemoteException, IOException {
|
|
|
+ int noOfRetries) throws IOException {
|
|
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
|
|
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
|
|
|
!isAMReachableFromClient);
|
|
|
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
|
|
|
- when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
|
|
|
- new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
- "Connection refuced1"))).thenThrow(
|
|
|
- new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
- "Connection refuced2"))).thenThrow(
|
|
|
- new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
- "Connection refuced3")));
|
|
|
- ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
- conf, rmDelegate, oldJobId, historyServerProxy);
|
|
|
try {
|
|
|
- clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
- Assert.fail("It should throw exception after retries");
|
|
|
- } catch (IOException e) {
|
|
|
- System.out.println("fail to get job status,and e=" + e.toString());
|
|
|
+ when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced1"))).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced2"))).thenThrow(
|
|
|
+ new java.lang.reflect.UndeclaredThrowableException(new IOException(
|
|
|
+ "Connection refuced3")));
|
|
|
+ ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
+ conf, rmDelegate, oldJobId, historyServerProxy);
|
|
|
+ try {
|
|
|
+ clientServiceDelegate.getJobStatus(oldJobId);
|
|
|
+ Assert.fail("It should throw exception after retries");
|
|
|
+ } catch (IOException e) {
|
|
|
+ System.out.println("fail to get job status,and e=" + e.toString());
|
|
|
+ }
|
|
|
+ verify(rmDelegate, times(noOfRetries)).getApplicationReport(
|
|
|
+ any(ApplicationId.class));
|
|
|
+ } catch (YarnRemoteException e) {
|
|
|
+ throw new IOException(e);
|
|
|
}
|
|
|
- verify(rmDelegate, times(noOfRetries)).getApplicationReport(
|
|
|
- any(ApplicationId.class));
|
|
|
}
|
|
|
|
|
|
private GetJobReportRequest getJobReportRequest() {
|
|
@@ -429,10 +444,13 @@ public class TestClientServiceDelegate {
|
|
|
"N/A", 0.0f);
|
|
|
}
|
|
|
|
|
|
- private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException,
|
|
|
- IOException {
|
|
|
+ private ResourceMgrDelegate getRMDelegate() throws IOException {
|
|
|
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
|
|
|
- when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
|
|
|
+ try {
|
|
|
+ when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
|
|
|
+ } catch (YarnRemoteException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
return rm;
|
|
|
}
|
|
|
|