|
@@ -58,16 +58,12 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.TableMapping;
|
|
|
-import org.apache.hadoop.tools.rumen.JobTraceReader;
|
|
|
-import org.apache.hadoop.tools.rumen.LoggedJob;
|
|
|
-import org.apache.hadoop.util.ReflectionUtils;
|
|
|
import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
-import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -84,19 +80,32 @@ import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
|
|
|
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
|
|
|
import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
|
|
|
import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
|
|
|
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
|
|
|
-import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
|
|
import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler;
|
|
|
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
|
|
|
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
|
|
|
-import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
|
|
|
+import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
|
|
|
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
|
|
|
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
|
|
|
-import org.apache.hadoop.yarn.util.UTCClock;
|
|
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.File;
|
|
|
+import java.io.IOException;
|
|
|
+import java.security.Security;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Random;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
@Private
|
|
|
@Unstable
|
|
|
public class SLSRunner extends Configured implements Tool {
|
|
@@ -112,21 +121,12 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
private Resource nodeManagerResource;
|
|
|
private String nodeFile;
|
|
|
|
|
|
- // AM simulator
|
|
|
- private int AM_ID;
|
|
|
- private Map<String, AMSimulator> amMap;
|
|
|
- private Map<ApplicationId, AMSimulator> appIdAMSim;
|
|
|
- private Set<String> trackedApps;
|
|
|
- private Map<String, Class> amClassMap;
|
|
|
- private static int remainingApps = 0;
|
|
|
-
|
|
|
// metrics
|
|
|
private String metricsOutputDir;
|
|
|
private boolean printSimulation;
|
|
|
|
|
|
// other simulation information
|
|
|
- private int numNMs, numRacks, numAMs, numTasks;
|
|
|
- private long maxRuntime;
|
|
|
+ private int numNMs, numRacks;
|
|
|
private String tableMapping;
|
|
|
|
|
|
private final static Map<String, Object> simulateInfoMap = new HashMap<>();
|
|
@@ -135,6 +135,7 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
public final static Logger LOG = LoggerFactory.getLogger(SLSRunner.class);
|
|
|
|
|
|
private static boolean exitAtTheFinish = false;
|
|
|
+ private AMRunner amRunner;
|
|
|
|
|
|
/**
|
|
|
* The type of trace in input.
|
|
@@ -151,7 +152,7 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
private SynthTraceJobProducer stjp;
|
|
|
|
|
|
public static int getRemainingApps() {
|
|
|
- return remainingApps;
|
|
|
+ return AMRunner.REMAINING_APPS;
|
|
|
}
|
|
|
|
|
|
public SLSRunner() throws ClassNotFoundException {
|
|
@@ -176,9 +177,7 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
private void init(Configuration tempConf) throws ClassNotFoundException {
|
|
|
nmMap = new ConcurrentHashMap<>();
|
|
|
queueAppNumMap = new HashMap<>();
|
|
|
- amMap = new ConcurrentHashMap<>();
|
|
|
- amClassMap = new HashMap<>();
|
|
|
- appIdAMSim = new ConcurrentHashMap<>();
|
|
|
+ amRunner = new AMRunner(runner, this);
|
|
|
// runner configuration
|
|
|
setConf(tempConf);
|
|
|
|
|
@@ -186,15 +185,8 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
|
|
|
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
|
|
|
SLSRunner.runner.setQueueSize(poolSize);
|
|
|
- // <AMType, Class> map
|
|
|
- for (Map.Entry e : tempConf) {
|
|
|
- String key = e.getKey().toString();
|
|
|
- if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
|
|
|
- String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
|
|
|
- amClassMap.put(amType, Class.forName(tempConf.get(key)));
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
+ amRunner.init(tempConf);
|
|
|
nodeManagerResource = getNodeManagerResource();
|
|
|
}
|
|
|
|
|
@@ -227,14 +219,25 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
return Collections.unmodifiableMap(simulateInfoMap);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This is invoked before start.
|
|
|
+ * @param inType
|
|
|
+ * @param inTraces
|
|
|
+ * @param nodes
|
|
|
+ * @param outDir
|
|
|
+ * @param trackApps
|
|
|
+ * @param printsimulation
|
|
|
+ */
|
|
|
public void setSimulationParams(TraceType inType, String[] inTraces,
|
|
|
String nodes, String outDir, Set<String> trackApps,
|
|
|
boolean printsimulation) {
|
|
|
|
|
|
this.inputType = inType;
|
|
|
this.inputTraces = inTraces.clone();
|
|
|
+ this.amRunner.setInputType(this.inputType);
|
|
|
+ this.amRunner.setInputTraces(this.inputTraces);
|
|
|
+ this.amRunner.setTrackedApps(trackApps);
|
|
|
this.nodeFile = nodes;
|
|
|
- this.trackedApps = trackApps;
|
|
|
this.printSimulation = printsimulation;
|
|
|
metricsOutputDir = outDir;
|
|
|
tableMapping = outDir + "/tableMapping.csv";
|
|
@@ -247,15 +250,16 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
|
|
|
// start resource manager
|
|
|
startRM();
|
|
|
+ amRunner.setResourceManager(rm);
|
|
|
// start node managers
|
|
|
startNM();
|
|
|
// start application masters
|
|
|
- startAM();
|
|
|
+ amRunner.startAM();
|
|
|
// set queue & tracked apps information
|
|
|
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
|
|
|
.setQueueSet(this.queueAppNumMap.keySet());
|
|
|
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
|
|
|
- .setTrackedAppSet(this.trackedApps);
|
|
|
+ .setTrackedAppSet(amRunner.getTrackedApps());
|
|
|
// print out simulation info
|
|
|
printSimulationInfo();
|
|
|
// blocked until all nodes RUNNING
|
|
@@ -310,7 +314,7 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
rm = new ResourceManager() {
|
|
|
@Override
|
|
|
protected ApplicationMasterLauncher createAMLauncher() {
|
|
|
- return new MockAMLauncher(se, this.rmContext, appIdAMSim);
|
|
|
+ return new MockAMLauncher(se, this.rmContext);
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -422,109 +426,6 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
System.currentTimeMillis() - startTimeMS);
|
|
|
}
|
|
|
|
|
|
- private void startAM() throws YarnException, IOException {
|
|
|
- switch (inputType) {
|
|
|
- case SLS:
|
|
|
- for (String inputTrace : inputTraces) {
|
|
|
- startAMFromSLSTrace(inputTrace);
|
|
|
- }
|
|
|
- break;
|
|
|
- case RUMEN:
|
|
|
- long baselineTimeMS = 0;
|
|
|
- for (String inputTrace : inputTraces) {
|
|
|
- startAMFromRumenTrace(inputTrace, baselineTimeMS);
|
|
|
- }
|
|
|
- break;
|
|
|
- case SYNTH:
|
|
|
- startAMFromSynthGenerator();
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new YarnException("Input configuration not recognized, "
|
|
|
- + "trace type should be SLS, RUMEN, or SYNTH");
|
|
|
- }
|
|
|
-
|
|
|
- numAMs = amMap.size();
|
|
|
- remainingApps = numAMs;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Parse workload from a SLS trace file.
|
|
|
- */
|
|
|
- private void startAMFromSLSTrace(String inputTrace) throws IOException {
|
|
|
- JsonFactory jsonF = new JsonFactory();
|
|
|
- ObjectMapper mapper = new ObjectMapper();
|
|
|
-
|
|
|
- try (Reader input = new InputStreamReader(
|
|
|
- new FileInputStream(inputTrace), StandardCharsets.UTF_8)) {
|
|
|
- JavaType type = mapper.getTypeFactory().
|
|
|
- constructMapType(Map.class, String.class, String.class);
|
|
|
- Iterator<Map<String, String>> jobIter = mapper.readValues(
|
|
|
- jsonF.createParser(input), type);
|
|
|
-
|
|
|
- while (jobIter.hasNext()) {
|
|
|
- try {
|
|
|
- Map<String, String> jsonJob = jobIter.next();
|
|
|
- AMDefinitionSLS amDef = AMDefinitionFactory.createFromSlsTrace(jsonJob, this);
|
|
|
- startAMs(amDef);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Failed to create an AM: {}", e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void startAMs(AMDefinition amDef) {
|
|
|
- for (int i = 0; i < amDef.getJobCount(); i++) {
|
|
|
- JobDefinition jobDef = JobDefinition.Builder.create()
|
|
|
- .withAmDefinition(amDef)
|
|
|
- .withDeadline(-1)
|
|
|
- .withReservationId(null)
|
|
|
- .withParams(null)
|
|
|
- .build();
|
|
|
- runNewAM(jobDef);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void startAMs(AMDefinition amDef, ReservationId reservationId,
|
|
|
- Map<String, String> params, long deadline) {
|
|
|
- for (int i = 0; i < amDef.getJobCount(); i++) {
|
|
|
- JobDefinition jobDef = JobDefinition.Builder.create()
|
|
|
- .withAmDefinition(amDef)
|
|
|
- .withReservationId(reservationId)
|
|
|
- .withParams(params)
|
|
|
- .withDeadline(deadline)
|
|
|
- .build();
|
|
|
- runNewAM(jobDef);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Parse workload from a rumen trace file.
|
|
|
- */
|
|
|
- private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS)
|
|
|
- throws IOException {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- conf.set("fs.defaultFS", "file:///");
|
|
|
- File fin = new File(inputTrace);
|
|
|
-
|
|
|
- try (JobTraceReader reader = new JobTraceReader(
|
|
|
- new Path(fin.getAbsolutePath()), conf)) {
|
|
|
- LoggedJob job = reader.getNext();
|
|
|
-
|
|
|
- while (job != null) {
|
|
|
- try {
|
|
|
- AMDefinitionRumen amDef =
|
|
|
- AMDefinitionFactory.createFromRumenTrace(job, baselineTimeMS,
|
|
|
- this);
|
|
|
- startAMs(amDef);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Failed to create an AM", e);
|
|
|
- }
|
|
|
- job = reader.getNext();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
Resource getDefaultContainerResource() {
|
|
|
int containerMemory = getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
|
|
|
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
|
|
@@ -533,31 +434,6 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
return Resources.createResource(containerMemory, containerVCores);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * parse workload information from synth-generator trace files.
|
|
|
- */
|
|
|
- private void startAMFromSynthGenerator() throws YarnException, IOException {
|
|
|
- Configuration localConf = new Configuration();
|
|
|
- localConf.set("fs.defaultFS", "file:///");
|
|
|
- // if we use the nodeFile this could have been not initialized yet.
|
|
|
- if (stjp == null) {
|
|
|
- stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
|
|
|
- }
|
|
|
-
|
|
|
- SynthJob job;
|
|
|
- // we use stjp, a reference to the job producer instantiated during node
|
|
|
- // creation
|
|
|
- while ((job = (SynthJob) stjp.getNextJob()) != null) {
|
|
|
- ReservationId reservationId = null;
|
|
|
- if (job.hasDeadline()) {
|
|
|
- reservationId = ReservationId
|
|
|
- .newInstance(rm.getStartTime(), AM_ID);
|
|
|
- }
|
|
|
- AMDefinitionSynth amDef = AMDefinitionFactory.createFromSynth(job, this);
|
|
|
- startAMs(amDef, reservationId, job.getParams(), job.getDeadline());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
void increaseQueueAppNum(String queue) throws YarnException {
|
|
|
SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
|
|
|
String queueName = wrapper.getRealQueueName(queue);
|
|
@@ -575,43 +451,12 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private AMSimulator createAmSimulator(String jobType) {
|
|
|
- return (AMSimulator) ReflectionUtils.newInstance(
|
|
|
- amClassMap.get(jobType), new Configuration());
|
|
|
- }
|
|
|
-
|
|
|
- private void runNewAM(JobDefinition jobDef) {
|
|
|
- AMDefinition amDef = jobDef.getAmDefinition();
|
|
|
- String oldJobId = amDef.getOldAppId();
|
|
|
- AMSimulator amSim =
|
|
|
- createAmSimulator(amDef.getAmType());
|
|
|
-
|
|
|
- if (amSim != null) {
|
|
|
- int heartbeatInterval = getConf().getInt(
|
|
|
- SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
|
|
|
- SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
|
|
|
- boolean isTracked = trackedApps.contains(oldJobId);
|
|
|
-
|
|
|
- if (oldJobId == null) {
|
|
|
- oldJobId = Integer.toString(AM_ID);
|
|
|
- }
|
|
|
- AM_ID++;
|
|
|
- amSim.init(amDef, rm, this, isTracked, runner.getStartTimeMS(), heartbeatInterval, appIdAMSim);
|
|
|
- if (jobDef.getReservationId() != null) {
|
|
|
- // if we have a ReservationId, delegate reservation creation to
|
|
|
- // AMSim (reservation shape is impl specific)
|
|
|
- UTCClock clock = new UTCClock();
|
|
|
- amSim.initReservation(jobDef.getReservationId(), jobDef.getDeadline(),
|
|
|
- clock.getTime());
|
|
|
- }
|
|
|
- runner.schedule(amSim);
|
|
|
- maxRuntime = Math.max(maxRuntime, amDef.getJobFinishTime());
|
|
|
- numTasks += amDef.getTaskContainers().size();
|
|
|
- amMap.put(oldJobId, amSim);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private void printSimulationInfo() {
|
|
|
+ final int numAMs = amRunner.getNumAMs();
|
|
|
+ final int numTasks = amRunner.getNumTasks();
|
|
|
+ final long maxRuntime = amRunner.getMaxRuntime();
|
|
|
+ Map<String, AMSimulator> amMap = amRunner.getAmMap();
|
|
|
+
|
|
|
if (printSimulation) {
|
|
|
// node
|
|
|
LOG.info("------------------------------------");
|
|
@@ -663,7 +508,10 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
}
|
|
|
|
|
|
public static void decreaseRemainingApps() {
|
|
|
- remainingApps--;
|
|
|
+ AMRunner.REMAINING_APPS--;
|
|
|
+ if (AMRunner.REMAINING_APPS == 0) {
|
|
|
+ exitSLSRunner();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static void exitSLSRunner() {
|
|
@@ -854,4 +702,8 @@ public class SLSRunner extends Configured implements Tool {
|
|
|
public SynthTraceJobProducer getStjp() {
|
|
|
return stjp;
|
|
|
}
|
|
|
+
|
|
|
+ public AMSimulator getAMSimulatorByAppId(ApplicationId appId) {
|
|
|
+ return amRunner.getAMSimulator(appId);
|
|
|
+ }
|
|
|
}
|