|
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.UnknownHostException;
|
|
|
+import java.util.Iterator;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
|
|
|
@@ -28,6 +29,9 @@ import org.apache.avro.ipc.Server;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
+import org.apache.hadoop.mapreduce.ClientFactory;
|
|
|
+import org.apache.hadoop.mapreduce.Cluster;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
|
|
@@ -51,11 +55,12 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
|
|
-import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -89,7 +94,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
|
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -105,7 +109,6 @@ public class TestClientRedirect {
|
|
|
|
|
|
private static final String AMHOSTADDRESS = "0.0.0.0:10020";
|
|
|
private static final String HSHOSTADDRESS = "0.0.0.0:10021";
|
|
|
- private static final int HSPORT = 10020;
|
|
|
private volatile boolean amContact = false;
|
|
|
private volatile boolean hsContact = false;
|
|
|
private volatile boolean amRunning = false;
|
|
@@ -114,6 +117,8 @@ public class TestClientRedirect {
|
|
|
public void testRedirect() throws Exception {
|
|
|
|
|
|
Configuration conf = new YarnConfiguration();
|
|
|
+ conf.setClass("mapreduce.clientfactory.class.name",
|
|
|
+ YarnClientFactory.class, ClientFactory.class);
|
|
|
conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
|
|
|
conf.set(JHConfig.HS_BIND_ADDRESS, HSHOSTADDRESS);
|
|
|
RMService rmService = new RMService("test");
|
|
@@ -130,18 +135,46 @@ public class TestClientRedirect {
|
|
|
historyService.start(conf);
|
|
|
|
|
|
LOG.info("services started");
|
|
|
- YARNRunner yarnRunner = new YARNRunner(conf);
|
|
|
- Throwable t = null;
|
|
|
+ Cluster cluster = new Cluster(conf);
|
|
|
org.apache.hadoop.mapreduce.JobID jobID =
|
|
|
new org.apache.hadoop.mapred.JobID("201103121733", 1);
|
|
|
- yarnRunner.getJobCounters(jobID);
|
|
|
+ org.apache.hadoop.mapreduce.Counters counters = cluster.getJob(jobID)
|
|
|
+ .getCounters();
|
|
|
+ Iterator<org.apache.hadoop.mapreduce.CounterGroup> it = counters.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ org.apache.hadoop.mapreduce.CounterGroup group = it.next();
|
|
|
+ LOG.info("Group " + group.getDisplayName());
|
|
|
+ Iterator<org.apache.hadoop.mapreduce.Counter> itc = group.iterator();
|
|
|
+ while (itc.hasNext()) {
|
|
|
+ LOG.info("Counter is " + itc.next().getDisplayName());
|
|
|
+ }
|
|
|
+ }
|
|
|
Assert.assertTrue(amContact);
|
|
|
-
|
|
|
+
|
|
|
+ LOG.info("Sleeping for 5 seconds before stop for" +
|
|
|
+ " the client socket to not get EOF immediately..");
|
|
|
+ Thread.sleep(5000);
|
|
|
+
|
|
|
//bring down the AM service
|
|
|
amService.stop();
|
|
|
amRunning = false;
|
|
|
-
|
|
|
- yarnRunner.getJobCounters(jobID);
|
|
|
+
|
|
|
+ LOG.info("Sleeping for 5 seconds after stop for" +
|
|
|
+ " the server to exit cleanly..");
|
|
|
+ Thread.sleep(5000);
|
|
|
+
|
|
|
+ // Same client
|
|
|
+ counters = cluster.getJob(jobID).getCounters();
|
|
|
+ it = counters.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ org.apache.hadoop.mapreduce.CounterGroup group = it.next();
|
|
|
+ LOG.info("Group " + group.getDisplayName());
|
|
|
+ Iterator<org.apache.hadoop.mapreduce.Counter> itc = group.iterator();
|
|
|
+ while (itc.hasNext()) {
|
|
|
+ LOG.info("Counter is " + itc.next().getDisplayName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
Assert.assertTrue(hsContact);
|
|
|
|
|
|
rmService.stop();
|
|
@@ -149,7 +182,6 @@ public class TestClientRedirect {
|
|
|
}
|
|
|
|
|
|
class RMService extends AbstractService implements ClientRMProtocol {
|
|
|
- private ApplicationsManager applicationsManager;
|
|
|
private String clientServiceBindAddress;
|
|
|
InetSocketAddress clientBindAddress;
|
|
|
private Server server;
|
|
@@ -208,45 +240,45 @@ public class TestClientRedirect {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnRemoteException {
|
|
|
- throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(null).createYarnRemoteException("Test");
|
|
|
+ public SubmitApplicationResponse submitApplication(
|
|
|
+ SubmitApplicationRequest request) throws YarnRemoteException {
|
|
|
+ throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(
|
|
|
+ null).createYarnRemoteException("Test");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
- public FinishApplicationResponse finishApplication(FinishApplicationRequest request) throws YarnRemoteException {
|
|
|
+ public FinishApplicationResponse finishApplication(
|
|
|
+ FinishApplicationRequest request) throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
- public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request) throws YarnRemoteException {
|
|
|
+ public GetClusterMetricsResponse getClusterMetrics(
|
|
|
+ GetClusterMetricsRequest request) throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetAllApplicationsResponse getAllApplications(
|
|
|
GetAllApplicationsRequest request) throws YarnRemoteException {
|
|
|
- // TODO Auto-generated method stub
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetClusterNodesResponse getClusterNodes(
|
|
|
GetClusterNodesRequest request) throws YarnRemoteException {
|
|
|
- // TODO Auto-generated method stub
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
|
|
throws YarnRemoteException {
|
|
|
- // TODO Auto-generated method stub
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
|
|
GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
|
|
|
- // TODO Auto-generated method stub
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
@@ -258,10 +290,8 @@ public class TestClientRedirect {
|
|
|
|
|
|
@Override
|
|
|
public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
|
|
|
- JobId jobId = request.getJobId();
|
|
|
hsContact = true;
|
|
|
- Counters counters = recordFactory.newRecordInstance(Counters.class);
|
|
|
-// counters.groups = new HashMap<CharSequence, CounterGroup>();
|
|
|
+ Counters counters = getMyCounters();
|
|
|
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
|
|
|
response.setCounters(counters);
|
|
|
return response;
|
|
@@ -278,7 +308,7 @@ public class TestClientRedirect {
|
|
|
}
|
|
|
|
|
|
public AMService(String hostAddress) {
|
|
|
- super("TestClientService");
|
|
|
+ super("AMService");
|
|
|
this.hostAddress = hostAddress;
|
|
|
}
|
|
|
|
|
@@ -310,79 +340,107 @@ public class TestClientRedirect {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
|
|
|
+ public GetCountersResponse getCounters(GetCountersRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
JobId jobID = request.getJobId();
|
|
|
-
|
|
|
+
|
|
|
amContact = true;
|
|
|
- Counters counters = recordFactory.newRecordInstance(Counters.class);
|
|
|
-// counters.groups = new HashMap<CharSequence, CounterGroup>();
|
|
|
- GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
|
|
|
- response.setCounters(counters);
|
|
|
- return response;
|
|
|
- }
|
|
|
+
|
|
|
+ Counters counters = getMyCounters();
|
|
|
+ GetCountersResponse response = recordFactory
|
|
|
+ .newRecordInstance(GetCountersResponse.class);
|
|
|
+ response.setCounters(counters);
|
|
|
+ return response;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
- public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
|
|
|
- JobId jobId = request.getJobId();
|
|
|
- return null;
|
|
|
+ public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
+
|
|
|
+ amContact = true;
|
|
|
+
|
|
|
+ JobReport jobReport = recordFactory.newRecordInstance(JobReport.class);
|
|
|
+ jobReport.setJobId(request.getJobId());
|
|
|
+ jobReport.setJobState(JobState.RUNNING);
|
|
|
+ GetJobReportResponse response = recordFactory
|
|
|
+ .newRecordInstance(GetJobReportResponse.class);
|
|
|
+ response.setJobReport(jobReport);
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
|
|
|
- TaskId taskID = request.getTaskId();
|
|
|
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
- public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
|
|
|
- TaskAttemptId taskAttemptID = request.getTaskAttemptId();
|
|
|
+ public GetTaskAttemptReportResponse getTaskAttemptReport(
|
|
|
+ GetTaskAttemptReportRequest request) throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException {
|
|
|
- JobId jobId = request.getJobId();
|
|
|
- int fromEventId = request.getFromEventId();
|
|
|
- int maxEvents = request.getMaxEvents();
|
|
|
+ public GetTaskAttemptCompletionEventsResponse
|
|
|
+ getTaskAttemptCompletionEvents(
|
|
|
+ GetTaskAttemptCompletionEventsRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException {
|
|
|
- JobId jobID = request.getJobId();
|
|
|
- TaskType taskType = request.getTaskType();
|
|
|
+ public GetTaskReportsResponse
|
|
|
+ getTaskReports(GetTaskReportsRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
|
|
|
- TaskAttemptId taskAttemptID = request.getTaskAttemptId();
|
|
|
+ public GetDiagnosticsResponse
|
|
|
+ getDiagnostics(GetDiagnosticsRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
|
|
|
- JobId jobID = request.getJobId();
|
|
|
+ public KillJobResponse killJob(KillJobRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
|
|
|
- TaskId taskID = request.getTaskId();
|
|
|
+ public KillTaskResponse killTask(KillTaskRequest request)
|
|
|
+ throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
|
|
|
- TaskAttemptId taskAttemptID = request.getTaskAttemptId();
|
|
|
+ public KillTaskAttemptResponse killTaskAttempt(
|
|
|
+ KillTaskAttemptRequest request) throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
|
|
|
- TaskAttemptId taskAttemptID = request.getTaskAttemptId();
|
|
|
+ public FailTaskAttemptResponse failTaskAttempt(
|
|
|
+ FailTaskAttemptRequest request) throws YarnRemoteException {
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ static Counters getMyCounters() {
|
|
|
+ Counter counter = recordFactory.newRecordInstance(Counter.class);
|
|
|
+ counter.setName("Mycounter");
|
|
|
+ counter.setDisplayName("My counter display name");
|
|
|
+ counter.setValue(12345);
|
|
|
+
|
|
|
+ CounterGroup group = recordFactory
|
|
|
+ .newRecordInstance(CounterGroup.class);
|
|
|
+ group.setName("MyGroup");
|
|
|
+ group.setDisplayName("My groupd display name");
|
|
|
+ group.setCounter("myCounter", counter);
|
|
|
+
|
|
|
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
|
|
|
+ counters.setCounterGroup("myGroupd", group);
|
|
|
+ return counters;
|
|
|
+ }
|
|
|
}
|