|
@@ -72,13 +72,14 @@ import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.yarn.Clock;
|
|
import org.apache.hadoop.yarn.Clock;
|
|
|
|
+import org.apache.hadoop.yarn.ClusterInfo;
|
|
|
|
+import org.apache.hadoop.yarn.SystemClock;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
-import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
@@ -101,7 +102,8 @@ public class MRApp extends MRAppMaster {
|
|
|
|
|
|
private File testWorkDir;
|
|
private File testWorkDir;
|
|
private Path testAbsPath;
|
|
private Path testAbsPath;
|
|
-
|
|
|
|
|
|
+ private ClusterInfo clusterInfo;
|
|
|
|
+
|
|
public static String NM_HOST = "localhost";
|
|
public static String NM_HOST = "localhost";
|
|
public static int NM_PORT = 1234;
|
|
public static int NM_PORT = 1234;
|
|
public static int NM_HTTP_PORT = 8042;
|
|
public static int NM_HTTP_PORT = 8042;
|
|
@@ -120,6 +122,11 @@ public class MRApp extends MRAppMaster {
|
|
applicationId.setId(0);
|
|
applicationId.setId(0);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
|
|
|
+ boolean cleanOnStart, Clock clock) {
|
|
|
|
+ this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
|
|
|
|
+ }
|
|
|
|
+
|
|
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
|
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
|
boolean cleanOnStart) {
|
|
boolean cleanOnStart) {
|
|
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
|
|
this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
|
|
@@ -149,15 +156,28 @@ public class MRApp extends MRAppMaster {
|
|
|
|
|
|
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
|
public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
|
boolean cleanOnStart, int startCount) {
|
|
boolean cleanOnStart, int startCount) {
|
|
|
|
+ this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
|
|
|
|
+ new SystemClock());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
|
|
|
|
+ boolean cleanOnStart, int startCount, Clock clock) {
|
|
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
|
|
this(getApplicationAttemptId(applicationId, startCount), getContainerId(
|
|
applicationId, startCount), maps, reduces, autoComplete, testName,
|
|
applicationId, startCount), maps, reduces, autoComplete, testName,
|
|
- cleanOnStart, startCount);
|
|
|
|
|
|
+ cleanOnStart, startCount, clock);
|
|
}
|
|
}
|
|
|
|
|
|
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
|
|
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
|
|
int maps, int reduces, boolean autoComplete, String testName,
|
|
int maps, int reduces, boolean autoComplete, String testName,
|
|
boolean cleanOnStart, int startCount) {
|
|
boolean cleanOnStart, int startCount) {
|
|
- super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System
|
|
|
|
|
|
+ this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
|
|
|
|
+ cleanOnStart, startCount, new SystemClock());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
|
|
|
|
+ int maps, int reduces, boolean autoComplete, String testName,
|
|
|
|
+ boolean cleanOnStart, int startCount, Clock clock) {
|
|
|
|
+ super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
|
|
.currentTimeMillis());
|
|
.currentTimeMillis());
|
|
this.testWorkDir = new File("target", testName);
|
|
this.testWorkDir = new File("target", testName);
|
|
testAbsPath = new Path(testWorkDir.getAbsolutePath());
|
|
testAbsPath = new Path(testWorkDir.getAbsolutePath());
|
|
@@ -171,12 +191,28 @@ public class MRApp extends MRAppMaster {
|
|
throw new YarnException("could not cleanup test dir", e);
|
|
throw new YarnException("could not cleanup test dir", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
this.maps = maps;
|
|
this.maps = maps;
|
|
this.reduces = reduces;
|
|
this.reduces = reduces;
|
|
this.autoComplete = autoComplete;
|
|
this.autoComplete = autoComplete;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void init(Configuration conf) {
|
|
|
|
+ super.init(conf);
|
|
|
|
+ if (this.clusterInfo != null) {
|
|
|
|
+ getContext().getClusterInfo().setMinContainerCapability(
|
|
|
|
+ this.clusterInfo.getMinContainerCapability());
|
|
|
|
+ getContext().getClusterInfo().setMaxContainerCapability(
|
|
|
|
+ this.clusterInfo.getMaxContainerCapability());
|
|
|
|
+ } else {
|
|
|
|
+ getContext().getClusterInfo().setMinContainerCapability(
|
|
|
|
+ BuilderUtils.newResource(1024));
|
|
|
|
+ getContext().getClusterInfo().setMaxContainerCapability(
|
|
|
|
+ BuilderUtils.newResource(10240));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public Job submit(Configuration conf) throws Exception {
|
|
public Job submit(Configuration conf) throws Exception {
|
|
String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
|
|
String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
|
|
.getCurrentUser().getShortUserName());
|
|
.getCurrentUser().getShortUserName());
|
|
@@ -303,7 +339,7 @@ public class MRApp extends MRAppMaster {
|
|
getDispatcher().getEventHandler(),
|
|
getDispatcher().getEventHandler(),
|
|
getTaskAttemptListener(), getContext().getClock(),
|
|
getTaskAttemptListener(), getContext().getClock(),
|
|
getCommitter(), isNewApiCommitter(),
|
|
getCommitter(), isNewApiCommitter(),
|
|
- currentUser.getUserName());
|
|
|
|
|
|
+ currentUser.getUserName(), getContext());
|
|
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
|
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
|
|
|
|
|
|
getDispatcher().register(JobFinishEvent.Type.class,
|
|
getDispatcher().register(JobFinishEvent.Type.class,
|
|
@@ -391,7 +427,7 @@ public class MRApp extends MRAppMaster {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected ContainerAllocator createContainerAllocator(
|
|
protected ContainerAllocator createContainerAllocator(
|
|
- ClientService clientService, AppContext context) {
|
|
|
|
|
|
+ ClientService clientService, final AppContext context) {
|
|
return new ContainerAllocator(){
|
|
return new ContainerAllocator(){
|
|
private int containerCount;
|
|
private int containerCount;
|
|
@Override
|
|
@Override
|
|
@@ -447,6 +483,17 @@ public class MRApp extends MRAppMaster {
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void setClusterInfo(ClusterInfo clusterInfo) {
|
|
|
|
+ // Only useful if set before a job is started.
|
|
|
|
+ if (getServiceState() == Service.STATE.NOTINITED
|
|
|
|
+ || getServiceState() == Service.STATE.INITED) {
|
|
|
|
+ this.clusterInfo = clusterInfo;
|
|
|
|
+ } else {
|
|
|
|
+ throw new IllegalStateException(
|
|
|
|
+ "ClusterInfo can only be set before the App is STARTED");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
class TestJob extends JobImpl {
|
|
class TestJob extends JobImpl {
|
|
//override the init transition
|
|
//override the init transition
|
|
private final TestInitTransition initTransition = new TestInitTransition(
|
|
private final TestInitTransition initTransition = new TestInitTransition(
|
|
@@ -470,12 +517,14 @@ public class MRApp extends MRAppMaster {
|
|
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
|
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
|
Configuration conf, EventHandler eventHandler,
|
|
Configuration conf, EventHandler eventHandler,
|
|
TaskAttemptListener taskAttemptListener, Clock clock,
|
|
TaskAttemptListener taskAttemptListener, Clock clock,
|
|
- OutputCommitter committer, boolean newApiCommitter, String user) {
|
|
|
|
|
|
+ OutputCommitter committer, boolean newApiCommitter, String user,
|
|
|
|
+ AppContext appContext) {
|
|
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
|
|
super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
|
|
conf, eventHandler, taskAttemptListener,
|
|
conf, eventHandler, taskAttemptListener,
|
|
new JobTokenSecretManager(), new Credentials(), clock,
|
|
new JobTokenSecretManager(), new Credentials(), clock,
|
|
getCompletedTaskFromPreviousRun(), metrics, committer,
|
|
getCompletedTaskFromPreviousRun(), metrics, committer,
|
|
- newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
|
|
|
|
|
|
+ newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
|
|
|
|
+ appContext);
|
|
|
|
|
|
// This "this leak" is okay because the retained pointer is in an
|
|
// This "this leak" is okay because the retained pointer is in an
|
|
// instance variable.
|
|
// instance variable.
|