|
@@ -22,6 +22,8 @@ import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Mockito.*;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
@@ -31,8 +33,13 @@ import org.apache.hadoop.mapreduce.JobStatus;
|
|
|
import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
@@ -45,15 +52,30 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.junit.runners.Parameterized;
|
|
|
+import org.junit.runners.Parameterized.Parameters;
|
|
|
|
|
|
/**
|
|
|
* Tests for ClientServiceDelegate.java
|
|
|
*/
|
|
|
|
|
|
+@RunWith(value = Parameterized.class)
|
|
|
public class TestClientServiceDelegate {
|
|
|
private JobID oldJobId = JobID.forName("job_1315895242400_2");
|
|
|
private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
|
|
|
.toYarn(oldJobId);
|
|
|
+ private boolean isAMReachableFromClient;
|
|
|
+
|
|
|
+ public TestClientServiceDelegate(boolean isAMReachableFromClient) {
|
|
|
+ this.isAMReachableFromClient = isAMReachableFromClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Parameters
|
|
|
+ public static Collection<Object[]> data() {
|
|
|
+ Object[][] data = new Object[][] { { true }, { false } };
|
|
|
+ return Arrays.asList(data);
|
|
|
+ }
|
|
|
|
|
|
@Test
|
|
|
public void testUnknownAppInRM() throws Exception {
|
|
@@ -150,9 +172,30 @@ public class TestClientServiceDelegate {
|
|
|
Assert.assertEquals(1.0f, jobStatus.getMapProgress());
|
|
|
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testCountersFromHistoryServer() throws Exception {
|
|
|
+ MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
+ when(historyServerProxy.getCounters(getCountersRequest())).thenReturn(
|
|
|
+ getCountersResponseFromHistoryServer());
|
|
|
+ ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
|
|
|
+ when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
|
|
|
+ .thenReturn(null);
|
|
|
+ ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
|
|
|
+ historyServerProxy, rm);
|
|
|
+
|
|
|
+ Counters counters = TypeConverter.toYarn(clientServiceDelegate.getJobCounters(oldJobId));
|
|
|
+ Assert.assertNotNull(counters);
|
|
|
+ Assert.assertEquals(1001, counters.getCounterGroup("dummyCounters").getCounter("dummyCounter").getValue());
|
|
|
+ }
|
|
|
|
|
|
@Test
|
|
|
public void testReconnectOnAMRestart() throws IOException {
|
|
|
+ //test not applicable when AM not reachable
|
|
|
+ //as instantiateAMProxy is not called at all
|
|
|
+ if(!isAMReachableFromClient) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
|
|
|
|
|
@@ -186,7 +229,7 @@ public class TestClientServiceDelegate {
|
|
|
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
|
|
|
when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
|
|
|
.thenReturn(jobReportResponse2);
|
|
|
-
|
|
|
+
|
|
|
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
|
|
|
historyServerProxy, rmDelegate));
|
|
|
// First time, connection should be to AM1, then to AM2. Further requests
|
|
@@ -210,13 +253,13 @@ public class TestClientServiceDelegate {
|
|
|
verify(clientServiceDelegate, times(2)).instantiateAMProxy(
|
|
|
any(String.class));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private GetJobReportRequest getJobReportRequest() {
|
|
|
GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
|
|
|
request.setJobId(jobId);
|
|
|
return request;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private GetJobReportResponse getJobReportResponse() {
|
|
|
GetJobReportResponse jobReportResponse = Records
|
|
|
.newRecord(GetJobReportResponse.class);
|
|
@@ -227,6 +270,12 @@ public class TestClientServiceDelegate {
|
|
|
return jobReportResponse;
|
|
|
}
|
|
|
|
|
|
+ private GetCountersRequest getCountersRequest() {
|
|
|
+ GetCountersRequest request = Records.newRecord(GetCountersRequest.class);
|
|
|
+ request.setJobId(jobId);
|
|
|
+ return request;
|
|
|
+ }
|
|
|
+
|
|
|
private ApplicationReport getFinishedApplicationReport() {
|
|
|
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
|
|
|
1234, 5), "user", "queue", "appname", "host", 124, null,
|
|
@@ -251,6 +300,7 @@ public class TestClientServiceDelegate {
|
|
|
MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
|
|
|
Configuration conf = new YarnConfiguration();
|
|
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient);
|
|
|
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
|
|
|
conf, rm, oldJobId, historyServerProxy);
|
|
|
return clientServiceDelegate;
|
|
@@ -269,4 +319,21 @@ public class TestClientServiceDelegate {
|
|
|
jobReportResponse.setJobReport(jobReport);
|
|
|
return jobReportResponse;
|
|
|
}
|
|
|
+
|
|
|
+ private GetCountersResponse getCountersResponseFromHistoryServer() {
|
|
|
+ GetCountersResponse countersResponse = Records
|
|
|
+ .newRecord(GetCountersResponse.class);
|
|
|
+ Counter counter = Records.newRecord(Counter.class);
|
|
|
+ CounterGroup counterGroup = Records.newRecord(CounterGroup.class);
|
|
|
+ Counters counters = Records.newRecord(Counters.class);
|
|
|
+ counter.setDisplayName("dummyCounter");
|
|
|
+ counter.setName("dummyCounter");
|
|
|
+ counter.setValue(1001);
|
|
|
+ counterGroup.setName("dummyCounters");
|
|
|
+ counterGroup.setDisplayName("dummyCounters");
|
|
|
+ counterGroup.setCounter("dummyCounter", counter);
|
|
|
+ counters.setCounterGroup("dummyCounters", counterGroup);
|
|
|
+ countersResponse.setCounters(counters);
|
|
|
+ return countersResponse;
|
|
|
+ }
|
|
|
}
|