Browse Source

YARN-7732. Support Generic AM Simulator from SynthGenerator. (Contributed by Young Chen via curino)

Carlo Curino 7 years ago
parent
commit
84cea0011f
19 changed files with 1430 additions and 642 deletions
  1. 2 0
      hadoop-tools/hadoop-sls/pom.xml
  2. 56 81
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
  3. 1 1
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
  4. 5 2
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
  5. 273 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
  6. 21 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
  7. 214 153
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
  8. 0 180
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
  9. 411 76
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
  10. 0 121
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
  11. 1 1
      hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
  12. 76 0
      hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
  13. 76 0
      hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
  14. 188 25
      hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java
  15. 1 1
      hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
  16. 4 0
      hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml
  17. 1 1
      hadoop-tools/hadoop-sls/src/test/resources/syn.json
  18. 54 0
      hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json
  19. 46 0
      hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json

+ 2 - 0
hadoop-tools/hadoop-sls/pom.xml

@@ -133,6 +133,8 @@
             <exclude>src/test/resources/simulate.info.html.template</exclude>
             <exclude>src/test/resources/simulate.info.html.template</exclude>
             <exclude>src/test/resources/track.html.template</exclude>
             <exclude>src/test/resources/track.html.template</exclude>
             <exclude>src/test/resources/syn.json</exclude>
             <exclude>src/test/resources/syn.json</exclude>
+            <exclude>src/test/resources/syn_generic.json</exclude>
+            <exclude>src/test/resources/syn_stream.json</exclude>
             <exclude>src/test/resources/inputsls.json</exclude>
             <exclude>src/test/resources/inputsls.json</exclude>
             <exclude>src/test/resources/nodes.json</exclude>
             <exclude>src/test/resources/nodes.json</exclude>
             <exclude>src/test/resources/exit-invariants.txt</exclude>
             <exclude>src/test/resources/exit-invariants.txt</exclude>

+ 56 - 81
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java

@@ -47,13 +47,11 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.ToolRunner;
@@ -627,89 +625,66 @@ public class SLSRunner extends Configured implements Tool {
     localConf.set("fs.defaultFS", "file:///");
     localConf.set("fs.defaultFS", "file:///");
     long baselineTimeMS = 0;
     long baselineTimeMS = 0;
 
 
-    try {
+    // if we use the nodeFile this could have been not initialized yet.
+    if (stjp == null) {
+      stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+    }
 
 
-      // if we use the nodeFile this could have been not initialized yet.
-      if (stjp == null) {
-        stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
-      }
+    SynthJob job = null;
+    // we use stjp, a reference to the job producer instantiated during node
+    // creation
+    while ((job = (SynthJob) stjp.getNextJob()) != null) {
+      // only support MapReduce currently
+      String user = job.getUser();
+      String jobQueue = job.getQueueName();
+      String oldJobId = job.getJobID().toString();
+      long jobStartTimeMS = job.getSubmissionTime();
 
 
-      SynthJob job = null;
-      // we use stjp, a reference to the job producer instantiated during node
-      // creation
-      while ((job = (SynthJob) stjp.getNextJob()) != null) {
-        // only support MapReduce currently
-        String user = job.getUser();
-        String jobQueue = job.getQueueName();
-        String oldJobId = job.getJobID().toString();
-        long jobStartTimeMS = job.getSubmissionTime();
-
-        // CARLO: Finish time is only used for logging, omit for now
-        long jobFinishTimeMS = -1L;
-
-        if (baselineTimeMS == 0) {
-          baselineTimeMS = jobStartTimeMS;
-        }
-        jobStartTimeMS -= baselineTimeMS;
-        jobFinishTimeMS -= baselineTimeMS;
-        if (jobStartTimeMS < 0) {
-          LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
-          jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
-          jobStartTimeMS = 0;
-        }
-
-        increaseQueueAppNum(jobQueue);
-
-        List<ContainerSimulator> containerList =
-            new ArrayList<ContainerSimulator>();
-        ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
-        Random rand = new Random(stjp.getSeed());
-
-        // map tasks
-        for (int i = 0; i < job.getNumberMaps(); i++) {
-          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
-          RMNode node =
-              nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
-                  .getNode();
-          String hostname = "/" + node.getRackName() + "/" + node.getHostName();
-          long containerLifeTime = tai.getRuntime();
-          Resource containerResource =
-              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
-                  (int) tai.getTaskInfo().getTaskVCores());
-          containerList.add(new ContainerSimulator(containerResource,
-              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
-        }
+      // CARLO: Finish time is only used for logging, omit for now
+      long jobFinishTimeMS = jobStartTimeMS + job.getDuration();
 
 
-        // reduce tasks
-        for (int i = 0; i < job.getNumberReduces(); i++) {
-          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
-          RMNode node =
-              nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
-                  .getNode();
-          String hostname = "/" + node.getRackName() + "/" + node.getHostName();
-          long containerLifeTime = tai.getRuntime();
-          Resource containerResource =
-              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
-                  (int) tai.getTaskInfo().getTaskVCores());
-          containerList.add(
-              new ContainerSimulator(containerResource, containerLifeTime,
-                  hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
-        }
+      if (baselineTimeMS == 0) {
+        baselineTimeMS = jobStartTimeMS;
+      }
+      jobStartTimeMS -= baselineTimeMS;
+      jobFinishTimeMS -= baselineTimeMS;
+      if (jobStartTimeMS < 0) {
+        LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
+        jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
+        jobStartTimeMS = 0;
+      }
 
 
-        ReservationId reservationId = null;
+      increaseQueueAppNum(jobQueue);
+
+      List<ContainerSimulator> containerList =
+          new ArrayList<ContainerSimulator>();
+      ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
+      Random rand = new Random(stjp.getSeed());
+
+      for (SynthJob.SynthTask task : job.getTasks()) {
+        RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
+            .getNode();
+        String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+        long containerLifeTime = task.getTime();
+        Resource containerResource = Resource
+            .newInstance((int) task.getMemory(), (int) task.getVcores());
+        containerList.add(
+            new ContainerSimulator(containerResource, containerLifeTime,
+                hostname, task.getPriority(), task.getType()));
+      }
 
 
-        if (job.hasDeadline()) {
-          reservationId =
-              ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
-        }
 
 
-        runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
-            jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
-            job.getDeadline(), getAMContainerResource(null));
+      ReservationId reservationId = null;
 
 
+      if(job.hasDeadline()){
+        reservationId = ReservationId
+            .newInstance(this.rm.getStartTime(), AM_ID);
       }
       }
-    } finally {
-      stjp.close();
+
+      runNewAM(job.getType(), user, jobQueue, oldJobId,
+          jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
+          job.getDeadline(), getAMContainerResource(null),
+          job.getParams());
     }
     }
   }
   }
 
 
@@ -753,14 +728,14 @@ public class SLSRunner extends Configured implements Tool {
       Resource amContainerResource) {
       Resource amContainerResource) {
     runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
     runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
         jobFinishTimeMS, containerList, null,  -1,
         jobFinishTimeMS, containerList, null,  -1,
-        amContainerResource);
+        amContainerResource, null);
   }
   }
 
 
   private void runNewAM(String jobType, String user,
   private void runNewAM(String jobType, String user,
       String jobQueue, String oldJobId, long jobStartTimeMS,
       String jobQueue, String oldJobId, long jobStartTimeMS,
       long jobFinishTimeMS, List<ContainerSimulator> containerList,
       long jobFinishTimeMS, List<ContainerSimulator> containerList,
-      ReservationId reservationId, long deadline,
-      Resource amContainerResource) {
+      ReservationId reservationId, long deadline, Resource amContainerResource,
+      Map<String, String> params) {
 
 
     AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
     AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
         amClassMap.get(jobType), new Configuration());
         amClassMap.get(jobType), new Configuration());
@@ -777,7 +752,7 @@ public class SLSRunner extends Configured implements Tool {
       AM_ID++;
       AM_ID++;
       amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
       amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
           jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
           jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
-          runner.getStartTimeMS(), amContainerResource);
+          runner.getStartTimeMS(), amContainerResource, params);
       if(reservationId != null) {
       if(reservationId != null) {
         // if we have a ReservationId, delegate reservation creation to
         // if we have a ReservationId, delegate reservation creation to
         // AMSim (reservation shape is impl specific)
         // AMSim (reservation shape is impl specific)

+ 1 - 1
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java

@@ -121,7 +121,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
       List<ContainerSimulator> containerList, ResourceManager resourceManager,
       List<ContainerSimulator> containerList, ResourceManager resourceManager,
       SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
       SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
       String simQueue, boolean tracked, String oldApp, long baseTimeMS,
       String simQueue, boolean tracked, String oldApp, long baseTimeMS,
-      Resource amResource) {
+      Resource amResource, Map<String, String> params) {
     super.init(startTime, startTime + 1000000L * heartbeatInterval,
     super.init(startTime, startTime + 1000000L * heartbeatInterval,
         heartbeatInterval);
         heartbeatInterval);
     this.user = simUser;
     this.user = simUser;

+ 5 - 2
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java

@@ -65,6 +65,9 @@ public class MRAMSimulator extends AMSimulator {
   scheduled when all maps have finished (not support slow-start currently).
   scheduled when all maps have finished (not support slow-start currently).
   */
   */
 
 
+  public static final String MAP_TYPE = "map";
+  public static final String REDUCE_TYPE = "reduce";
+
   private static final int PRIORITY_REDUCE = 10;
   private static final int PRIORITY_REDUCE = 10;
   private static final int PRIORITY_MAP = 20;
   private static final int PRIORITY_MAP = 20;
 
 
@@ -123,10 +126,10 @@ public class MRAMSimulator extends AMSimulator {
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       long traceStartTime, long traceFinishTime, String user, String queue,
       long traceStartTime, long traceFinishTime, String user, String queue,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
-      Resource amContainerResource) {
+      Resource amContainerResource, Map<String, String> params) {
     super.init(heartbeatInterval, containerList, rm, se,
     super.init(heartbeatInterval, containerList, rm, se,
         traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
         traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
-        baselineStartTimeMS, amContainerResource);
+        baselineStartTimeMS, amContainerResource, params);
     amtype = "mapreduce";
     amtype = "mapreduce";
 
 
     // get map/reduce tasks
     // get map/reduce tasks

+ 273 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java

@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AMSimulator that simulates streaming services - it keeps tasks
+ * running and resubmits them whenever they fail or complete. It finishes
+ * when the specified duration expires.
+ */
+
+@Private
+@Unstable
+public class StreamAMSimulator extends AMSimulator {
+  /*
+  Vocabulary Used:
+  pending -> requests which are NOT yet sent to RM
+  scheduled -> requests which are sent to RM but not yet assigned
+  assigned -> requests which are assigned to a container
+  completed -> request corresponding to which container has completed
+
+  streams are constantly scheduled. If a streaming job is killed, we restart it
+  */
+
+  private static final int PRIORITY_MAP = 20;
+
+  // pending streams
+  private LinkedList<ContainerSimulator> pendingStreams =
+          new LinkedList<>();
+
+  // scheduled streams
+  private LinkedList<ContainerSimulator> scheduledStreams =
+          new LinkedList<ContainerSimulator>();
+
+  // assigned streams
+  private Map<ContainerId, ContainerSimulator> assignedStreams =
+          new HashMap<ContainerId, ContainerSimulator>();
+
+  // all streams
+  private LinkedList<ContainerSimulator> allStreams =
+          new LinkedList<ContainerSimulator>();
+
+  // finished
+  private boolean isFinished = false;
+  private long duration = 0;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamAMSimulator.class);
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public void init(int heartbeatInterval,
+      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
+      long traceStartTime, long traceFinishTime, String user, String queue,
+      boolean isTracked, String oldAppId, long baselineStartTimeMS,
+      Resource amContainerResource, Map<String, String> params) {
+    super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
+        traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
+        amContainerResource, params);
+    amtype = "stream";
+
+    allStreams.addAll(containerList);
+
+    duration = traceFinishTime - traceStartTime;
+
+    LOG.info("Added new job with {} streams, running for {}",
+        allStreams.size(), duration);
+  }
+
+  @Override
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    if (null != masterContainer) {
+      restart();
+      super.notifyAMContainerLaunched(masterContainer);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void processResponseQueue() throws Exception {
+    while (!responseQueue.isEmpty()) {
+      AllocateResponse response = responseQueue.take();
+
+      // check completed containers
+      if (!response.getCompletedContainersStatuses().isEmpty()) {
+        for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+          ContainerId containerId = cs.getContainerId();
+          if(assignedStreams.containsKey(containerId)){
+            // One of our containers completed. Regardless of reason,
+            // we want to maintain our streaming process
+            LOG.debug("Application {} has one streamer finished ({}).", appId,
+                containerId);
+            pendingStreams.add(assignedStreams.remove(containerId));
+          } else if (amContainer.getId().equals(containerId)){
+            // Our am container completed
+            if(cs.getExitStatus() == ContainerExitStatus.SUCCESS){
+              // am container released event (am container completed on success)
+              isAMContainerRunning = false;
+              isFinished = true;
+              LOG.info("Application {} goes to finish.", appId);
+            } else {
+              // am container killed - wait for re allocation
+              LOG.info("Application {}'s AM is "
+                  + "going to be killed. Waiting for rescheduling...", appId);
+              isAMContainerRunning = false;
+            }
+          }
+        }
+      }
+
+      // check finished
+      if (isAMContainerRunning &&
+          (System.currentTimeMillis() - simulateStartTimeMS >= duration)) {
+        LOG.debug("Application {} sends out event to clean up"
+                + " its AM container.", appId);
+        isAMContainerRunning = false;
+        isFinished = true;
+        break;
+      }
+
+      // check allocated containers
+      for (Container container : response.getAllocatedContainers()) {
+        if (!scheduledStreams.isEmpty()) {
+          ContainerSimulator cs = scheduledStreams.remove();
+          LOG.debug("Application {} starts to launch a stream ({}).", appId,
+              container.getId());
+          assignedStreams.put(container.getId(), cs);
+          se.getNmMap().get(container.getNodeId()).addNewContainer(container,
+              cs.getLifeTime());
+        }
+      }
+    }
+  }
+
+  /**
+   * restart running because of the am container killed.
+   */
+  private void restart()
+          throws YarnException, IOException, InterruptedException {
+    // clear
+    isFinished = false;
+    pendingStreams.clear();
+    pendingStreams.addAll(allStreams);
+
+    amContainer = null;
+  }
+
+  private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left,
+      List<ContainerSimulator> right) {
+    List<ContainerSimulator> list = new ArrayList<>();
+    list.addAll(left);
+    list.addAll(right);
+    return list;
+  }
+
+  @Override
+  protected void sendContainerRequest()
+          throws YarnException, IOException, InterruptedException {
+
+    // send out request
+    List<ResourceRequest> ask = new ArrayList<>();
+    List<ContainerId> release = new ArrayList<>();
+    if (!isFinished) {
+      if (!pendingStreams.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingStreams, scheduledStreams),
+            PRIORITY_MAP);
+        LOG.debug("Application {} sends out request for {} streams.",
+            appId, pendingStreams.size());
+        scheduledStreams.addAll(pendingStreams);
+        pendingStreams.clear();
+      }
+    }
+
+    if(isFinished){
+      release.addAll(assignedStreams.keySet());
+      ask.clear();
+    }
+
+    final AllocateRequest request = createAllocateRequest(ask, release);
+    if (totalContainers == 0) {
+      request.setProgress(1.0f);
+    } else {
+      request.setProgress((float) finishedContainers / totalContainers);
+    }
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+        .get(appAttemptId.getApplicationId())
+        .getRMAppAttempt(appAttemptId).getAMRMToken();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+    AllocateResponse response = ugi.doAs(
+        new PrivilegedExceptionAction<AllocateResponse>() {
+          @Override
+          public AllocateResponse run() throws Exception {
+            return rm.getApplicationMasterService().allocate(request);
+          }
+        });
+    if (response != null) {
+      responseQueue.put(response);
+    }
+  }
+
+  @Override
+  public void initReservation(
+      ReservationId reservationId, long deadline, long now){
+    // Streaming AM currently doesn't do reservations
+    setReservationRequest(null);
+  }
+
+  @Override
+  protected void checkStop() {
+    if (isFinished) {
+      super.setEndTime(System.currentTimeMillis());
+    }
+  }
+
+  @Override
+  public void lastStep() throws Exception {
+    super.lastStep();
+
+    // clear data structures
+    allStreams.clear();
+    assignedStreams.clear();
+    pendingStreams.clear();
+    scheduledStreams.clear();
+    responseQueue.clear();
+  }
+}

+ 21 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Application Master simulators for the SLS.
+ */
+package org.apache.hadoop.yarn.sls.appmaster;

+ 214 - 153
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java

@@ -19,19 +19,25 @@ package org.apache.hadoop.yarn.sls.synthetic;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.distribution.LogNormalDistribution;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.tools.rumen.*;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
 
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -46,6 +52,9 @@ public class SynthJob implements JobStory {
   @SuppressWarnings("StaticVariableName")
   @SuppressWarnings("StaticVariableName")
   private static Log LOG = LogFactory.getLog(SynthJob.class);
   private static Log LOG = LogFactory.getLog(SynthJob.class);
 
 
+  private static final long MIN_MEMORY = 1024;
+  private static final long MIN_VCORES = 1;
+
   private final Configuration conf;
   private final Configuration conf;
   private final int id;
   private final int id;
 
 
@@ -53,75 +62,93 @@ public class SynthJob implements JobStory {
   private static final AtomicInteger sequence = new AtomicInteger(0);
   private static final AtomicInteger sequence = new AtomicInteger(0);
   private final String name;
   private final String name;
   private final String queueName;
   private final String queueName;
-  private final SynthJobClass jobClass;
+  private final SynthTraceJobProducer.JobDefinition jobDef;
+
+  private String type;
 
 
   // job timing
   // job timing
   private final long submitTime;
   private final long submitTime;
   private final long duration;
   private final long duration;
   private final long deadline;
   private final long deadline;
 
 
-  private final int numMapTasks;
-  private final int numRedTasks;
-  private final long mapMaxMemory;
-  private final long reduceMaxMemory;
-  private final long mapMaxVcores;
-  private final long reduceMaxVcores;
-  private final long[] mapRuntime;
-  private final float[] reduceRuntime;
-  private long totMapRuntime;
-  private long totRedRuntime;
+  private Map<String, String> params;
+
+  private long totalSlotTime = 0;
+
+  // task information
+  private List<SynthTask> tasks = new ArrayList<>();
+  private Map<String, List<SynthTask>> taskByType = new HashMap<>();
+  private Map<String, Integer> taskCounts = new HashMap<>();
+  private Map<String, Long> taskMemory = new HashMap<>();
+  private Map<String, Long> taskVcores = new HashMap<>();
+
+  /**
+   * Nested class used to represent a task instance in a job. Each task
+   * corresponds to one container allocation for the job.
+   */
+  public static final class SynthTask{
+    private String type;
+    private long time;
+    private long maxMemory;
+    private long maxVcores;
+    private int priority;
+
+    private SynthTask(String type, long time, long maxMemory, long maxVcores,
+        int priority){
+      this.type = type;
+      this.time = time;
+      this.maxMemory = maxMemory;
+      this.maxVcores = maxVcores;
+      this.priority = priority;
+    }
+
+    public String getType(){
+      return type;
+    }
 
 
-  public SynthJob(JDKRandomGenerator rand, Configuration conf,
-      SynthJobClass jobClass, long actualSubmissionTime) {
+    public long getTime(){
+      return time;
+    }
 
 
-    this.conf = conf;
-    this.jobClass = jobClass;
-
-    this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
-    this.numMapTasks = jobClass.getMtasks();
-    this.numRedTasks = jobClass.getRtasks();
-
-    // sample memory distributions, correct for sub-minAlloc sizes
-    long tempMapMaxMemory = jobClass.getMapMaxMemory();
-    this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
-        ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
-    long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
-    this.reduceMaxMemory =
-            tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
-            ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
-
-    // sample vcores distributions, correct for sub-minAlloc sizes
-    long tempMapMaxVCores = jobClass.getMapMaxVcores();
-    this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
-        ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
-    long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
-    this.reduceMaxVcores =
-        tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
-            ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
-
-    if (numMapTasks > 0) {
-      conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
-      conf.set(MRJobConfig.MAP_JAVA_OPTS,
-          "-Xmx" + (this.mapMaxMemory - 100) + "m");
+    public long getMemory(){
+      return maxMemory;
     }
     }
 
 
-    if (numRedTasks > 0) {
-      conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
-      conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
-          "-Xmx" + (this.reduceMaxMemory - 100) + "m");
+    public long getVcores(){
+      return maxVcores;
     }
     }
 
 
-    boolean hasDeadline =
-        (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
+    public int getPriority(){
+      return priority;
+    }
+
+    @Override
+    public String toString(){
+      return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: "
+              + "%3$4s\tvcores: %4$2s%n", getType(), getTime(), getMemory(),
+          getVcores());
+    }
+  }
 
 
-    LogNormalDistribution deadlineFactor =
-        SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg,
-            jobClass.jobClass.deadline_factor_stddev);
 
 
-    double deadlineFactorSample =
-        (deadlineFactor != null) ? deadlineFactor.sample() : -1;
+  protected SynthJob(JDKRandomGenerator rand, Configuration conf,
+      SynthTraceJobProducer.JobDefinition jobDef,
+      String queue, long actualSubmissionTime) {
 
 
-    this.queueName = jobClass.workload.getQueueName();
+    this.conf = conf;
+    this.jobDef = jobDef;
+
+    this.queueName = queue;
+
+    this.duration = MILLISECONDS.convert(jobDef.duration.getInt(),
+        SECONDS);
+
+    boolean hasDeadline =
+        (rand.nextDouble() <= jobDef.reservation.getDouble());
+
+    double deadlineFactorSample = jobDef.deadline_factor.getDouble();
+
+    this.type = jobDef.type;
 
 
     this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
     this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
 
 
@@ -129,6 +156,8 @@ public class SynthJob implements JobStory {
         hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
         hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
             + (long) Math.ceil(deadlineFactorSample * duration) : -1;
             + (long) Math.ceil(deadlineFactorSample * duration) : -1;
 
 
+    this.params = jobDef.params;
+
     conf.set(QUEUE_NAME, queueName);
     conf.set(QUEUE_NAME, queueName);
 
 
     // name and initialize job randomness
     // name and initialize job randomness
@@ -136,79 +165,166 @@ public class SynthJob implements JobStory {
     rand.setSeed(seed);
     rand.setSeed(seed);
     id = sequence.getAndIncrement();
     id = sequence.getAndIncrement();
 
 
-    name = String.format(jobClass.getClassName() + "_%06d", id);
+    name = String.format(jobDef.class_name + "_%06d", id);
     LOG.debug(name + " (" + seed + ")");
     LOG.debug(name + " (" + seed + ")");
 
 
     LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
     LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
         + " deadline:" + deadline + " duration:" + duration
         + " deadline:" + deadline + " duration:" + duration
         + " deadline-submission: " + (deadline - submitTime));
         + " deadline-submission: " + (deadline - submitTime));
 
 
-    // generate map and reduce runtimes
-    mapRuntime = new long[numMapTasks];
-    for (int i = 0; i < numMapTasks; i++) {
-      mapRuntime[i] = jobClass.getMapTimeSample();
-      totMapRuntime += mapRuntime[i];
-    }
-    reduceRuntime = new float[numRedTasks];
-    for (int i = 0; i < numRedTasks; i++) {
-      reduceRuntime[i] = jobClass.getReduceTimeSample();
-      totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
+    // Expand tasks
+    for(SynthTraceJobProducer.TaskDefinition task : jobDef.tasks){
+      int num = task.count.getInt();
+      String taskType = task.type;
+      long memory = task.max_memory.getLong();
+      memory = memory < MIN_MEMORY ? MIN_MEMORY: memory;
+      long vcores = task.max_vcores.getLong();
+      vcores = vcores < MIN_VCORES ? MIN_VCORES  : vcores;
+      int priority = task.priority;
+
+      // Save task information by type
+      taskByType.put(taskType, new ArrayList<>());
+      taskCounts.put(taskType, num);
+      taskMemory.put(taskType, memory);
+      taskVcores.put(taskType, vcores);
+
+      for(int i = 0; i < num; ++i){
+        long time = task.time.getLong();
+        totalSlotTime += time;
+        SynthTask t = new SynthTask(taskType, time, memory, vcores,
+            priority);
+        tasks.add(t);
+        taskByType.get(taskType).add(t);
+      }
     }
     }
+
+  }
+
+  public String getType(){
+    return type;
+  }
+
+  public List<SynthTask> getTasks(){
+    return tasks;
   }
   }
 
 
   public boolean hasDeadline() {
   public boolean hasDeadline() {
     return deadline > 0;
     return deadline > 0;
   }
   }
 
 
-  @Override
   public String getName() {
   public String getName() {
     return name;
     return name;
   }
   }
 
 
-  @Override
   public String getUser() {
   public String getUser() {
-    return jobClass.getUserName();
+    return jobDef.user_name;
   }
   }
 
 
-  @Override
   public JobID getJobID() {
   public JobID getJobID() {
     return new JobID("job_mock_" + name, id);
     return new JobID("job_mock_" + name, id);
   }
   }
 
 
+  public long getSubmissionTime() {
+    return submitTime;
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
   @Override
   @Override
-  public Values getOutcome() {
-    return Values.SUCCESS;
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    String res = "\nSynthJob [" + jobDef.class_name + "]: \n"
+        + "\tname: " + getName() + "\n"
+        + "\ttype: " + getType() + "\n"
+        + "\tid: " + id + "\n"
+        + "\tqueue: " + getQueueName() + "\n"
+        + "\tsubmission: " + getSubmissionTime() + "\n"
+        + "\tduration: " + getDuration() + "\n"
+        + "\tdeadline: " + getDeadline() + "\n";
+    sb.append(res);
+    int taskno = 0;
+    for(SynthJob.SynthTask t : getTasks()){
+      sb.append("\t");
+      sb.append(taskno);
+      sb.append(": \t");
+      sb.append(t.toString());
+      taskno++;
+    }
+    return sb.toString();
+  }
+
+  public long getTotalSlotTime() {
+    return totalSlotTime;
+  }
+
+  public long getDuration() {
+    return duration;
+  }
+
+  public long getDeadline() {
+    return deadline;
+  }
+
+  public Map<String, String> getParams() {
+    return params;
   }
   }
 
 
   @Override
   @Override
-  public long getSubmissionTime() {
-    return submitTime;
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthJob)) {
+      return false;
+    }
+    SynthJob o = (SynthJob) other;
+    return tasks.equals(o.tasks)
+        && submitTime == o.submitTime
+        && type.equals(o.type)
+        && queueName.equals(o.queueName)
+        && jobDef.class_name.equals(o.jobDef.class_name);
+  }
+
+  @Override
+  public int hashCode() {
+    return jobDef.class_name.hashCode()
+        * (int) submitTime * (int) duration;
+  }
+
+
+  @Override
+  public JobConf getJobConf() {
+    return new JobConf(conf);
   }
   }
 
 
   @Override
   @Override
   public int getNumberMaps() {
   public int getNumberMaps() {
-    return numMapTasks;
+    return taskCounts.get(MRAMSimulator.MAP_TYPE);
   }
   }
 
 
   @Override
   @Override
   public int getNumberReduces() {
   public int getNumberReduces() {
-    return numRedTasks;
+    return taskCounts.get(MRAMSimulator.REDUCE_TYPE);
+  }
+
+  @Override
+  public InputSplit[] getInputSplits() {
+    throw new UnsupportedOperationException();
   }
   }
 
 
   @Override
   @Override
   public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
   public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
-    switch (taskType) {
+    switch(taskType){
     case MAP:
     case MAP:
-      return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
+      return new TaskInfo(-1, -1, -1, -1,
+          taskMemory.get(MRAMSimulator.MAP_TYPE),
+          taskVcores.get(MRAMSimulator.MAP_TYPE));
     case REDUCE:
     case REDUCE:
-      return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
+      return new TaskInfo(-1, -1, -1, -1,
+          taskMemory.get(MRAMSimulator.REDUCE_TYPE),
+          taskVcores.get(MRAMSimulator.REDUCE_TYPE));
     default:
     default:
-      throw new IllegalArgumentException("Not interested");
+      break;
     }
     }
-  }
-
-  @Override
-  public InputSplit[] getInputSplits() {
     throw new UnsupportedOperationException();
     throw new UnsupportedOperationException();
   }
   }
 
 
@@ -218,17 +334,20 @@ public class SynthJob implements JobStory {
     switch (taskType) {
     switch (taskType) {
     case MAP:
     case MAP:
       return new MapTaskAttemptInfo(State.SUCCEEDED,
       return new MapTaskAttemptInfo(State.SUCCEEDED,
-          getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
-
+          getTaskInfo(taskType, taskNumber),
+          taskByType.get(MRAMSimulator.MAP_TYPE).get(taskNumber).time,
+          null);
     case REDUCE:
     case REDUCE:
       // We assume uniform split between pull/sort/reduce
       // We assume uniform split between pull/sort/reduce
       // aligned with naive progress reporting assumptions
       // aligned with naive progress reporting assumptions
       return new ReduceTaskAttemptInfo(State.SUCCEEDED,
       return new ReduceTaskAttemptInfo(State.SUCCEEDED,
           getTaskInfo(taskType, taskNumber),
           getTaskInfo(taskType, taskNumber),
-          (long) Math.round((reduceRuntime[taskNumber] / 3)),
-          (long) Math.round((reduceRuntime[taskNumber] / 3)),
-          (long) Math.round((reduceRuntime[taskNumber] / 3)), null);
-
+          taskByType.get(MRAMSimulator.MAP_TYPE)
+              .get(taskNumber).time / 3,
+          taskByType.get(MRAMSimulator.MAP_TYPE)
+              .get(taskNumber).time / 3,
+          taskByType.get(MRAMSimulator.MAP_TYPE)
+              .get(taskNumber).time / 3, null);
     default:
     default:
       break;
       break;
     }
     }
@@ -242,65 +361,7 @@ public class SynthJob implements JobStory {
   }
   }
 
 
   @Override
   @Override
-  public org.apache.hadoop.mapred.JobConf getJobConf() {
-    return new JobConf(conf);
-  }
-
-  @Override
-  public String getQueueName() {
-    return queueName;
-  }
-
-  @Override
-  public String toString() {
-    return "SynthJob [\n" + "  workload=" + jobClass.getWorkload().getId()
-        + "\n" + "  jobClass="
-        + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
-        + "  conf=" + conf + ",\n" + "  id=" + id + ",\n" + "  name=" + name
-        + ",\n" + "  mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
-        + "  reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
-        + "  submitTime=" + submitTime + ",\n" + "  numMapTasks=" + numMapTasks
-        + ",\n" + "  numRedTasks=" + numRedTasks + ",\n" + "  mapMaxMemory="
-        + mapMaxMemory + ",\n" + "  reduceMaxMemory=" + reduceMaxMemory + ",\n"
-        + "  queueName=" + queueName + "\n" + "]";
-  }
-
-  public SynthJobClass getJobClass() {
-    return jobClass;
-  }
-
-  public long getTotalSlotTime() {
-    return totMapRuntime + totRedRuntime;
-  }
-
-  public long getDuration() {
-    return duration;
-  }
-
-  public long getDeadline() {
-    return deadline;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof SynthJob)) {
-      return false;
-    }
-    SynthJob o = (SynthJob) other;
-    return Arrays.equals(mapRuntime, o.mapRuntime)
-        && Arrays.equals(reduceRuntime, o.reduceRuntime)
-        && submitTime == o.submitTime && numMapTasks == o.numMapTasks
-        && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
-        && reduceMaxMemory == o.reduceMaxMemory
-        && mapMaxVcores == o.mapMaxVcores
-        && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName)
-        && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
-        && totRedRuntime == o.totRedRuntime;
-  }
-
-  @Override
-  public int hashCode() {
-    // could have a bad distr; investigate if a relevant use case exists
-    return jobClass.hashCode() * (int) submitTime;
+  public Values getOutcome() {
+    return Values.SUCCESS;
   }
   }
 }
 }

+ 0 - 180
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java

@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.synthetic;
-
-import org.apache.commons.math3.distribution.AbstractRealDistribution;
-import org.apache.commons.math3.distribution.LogNormalDistribution;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass;
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
-
-/**
- * This is a class that represent a class of Jobs. It is used to generate an
- * individual job, by picking random durations, task counts, container size,
- * etc.
- */
-public class SynthJobClass {
-
-  private final JDKRandomGenerator rand;
-  private final LogNormalDistribution dur;
-  private final LogNormalDistribution mapRuntime;
-  private final LogNormalDistribution redRuntime;
-  private final LogNormalDistribution mtasks;
-  private final LogNormalDistribution rtasks;
-  private final LogNormalDistribution mapMem;
-  private final LogNormalDistribution redMem;
-  private final LogNormalDistribution mapVcores;
-  private final LogNormalDistribution redVcores;
-
-  private final Trace trace;
-  @SuppressWarnings("VisibilityModifier")
-  protected final SynthWorkload workload;
-  @SuppressWarnings("VisibilityModifier")
-  protected final JobClass jobClass;
-
-  public SynthJobClass(JDKRandomGenerator rand, Trace trace,
-      SynthWorkload workload, int classId) {
-
-    this.trace = trace;
-    this.workload = workload;
-    this.rand = new JDKRandomGenerator();
-    this.rand.setSeed(rand.nextLong());
-    jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId);
-
-    this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg,
-        jobClass.dur_stddev);
-    this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg,
-        jobClass.mtime_stddev);
-    this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg,
-        jobClass.rtime_stddev);
-    this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg,
-        jobClass.mtasks_stddev);
-    this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg,
-        jobClass.rtasks_stddev);
-
-    this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg,
-        jobClass.map_max_memory_stddev);
-    this.redMem = SynthUtils.getLogNormalDist(rand,
-        jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev);
-    this.mapVcores = SynthUtils.getLogNormalDist(rand,
-        jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev);
-    this.redVcores = SynthUtils.getLogNormalDist(rand,
-        jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev);
-  }
-
-  public JobStory getJobStory(Configuration conf, long actualSubmissionTime) {
-    return new SynthJob(rand, conf, this, actualSubmissionTime);
-  }
-
-  @Override
-  public String toString() {
-    return "SynthJobClass [workload=" + workload.getName() + ", class="
-        + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur="
-        + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime="
-        + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0)
-        + ", redRuntime="
-        + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0)
-        + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0)
-        + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0)
-        + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n";
-
-  }
-
-  public double getClassWeight() {
-    return jobClass.class_weight;
-  }
-
-  public long getDur() {
-    return genLongSample(dur);
-  }
-
-  public int getMtasks() {
-    return genIntSample(mtasks);
-  }
-
-  public int getRtasks() {
-    return genIntSample(rtasks);
-  }
-
-  public long getMapMaxMemory() {
-    return genLongSample(mapMem);
-  }
-
-  public long getReduceMaxMemory() {
-    return genLongSample(redMem);
-  }
-
-  public long getMapMaxVcores() {
-    return genLongSample(mapVcores);
-  }
-
-  public long getReduceMaxVcores() {
-    return genLongSample(redVcores);
-  }
-
-  public SynthWorkload getWorkload() {
-    return workload;
-  }
-
-  public int genIntSample(AbstractRealDistribution dist) {
-    if (dist == null) {
-      return 0;
-    }
-    double baseSample = dist.sample();
-    if (baseSample < 0) {
-      baseSample = 0;
-    }
-    return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample));
-  }
-
-  public long genLongSample(AbstractRealDistribution dist) {
-    return dist != null ? (long) Math.ceil(dist.sample()) : 0;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof SynthJobClass)) {
-      return false;
-    }
-    SynthJobClass o = (SynthJobClass) other;
-    return workload.equals(o.workload);
-  }
-
-  @Override
-  public int hashCode() {
-    return workload.hashCode() * workload.getId();
-  }
-
-  public String getClassName() {
-    return jobClass.class_name;
-  }
-
-  public long getMapTimeSample() {
-    return genLongSample(mapRuntime);
-  }
-
-  public long getReduceTimeSample() {
-    return genLongSample(redRuntime);
-  }
-
-  public String getUserName() {
-    return jobClass.user_name;
-  }
-}

+ 411 - 76
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.synthetic;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -26,7 +27,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
+import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectMapper;
 
 
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlRootElement;
@@ -39,7 +44,7 @@ import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNK
 
 
 /**
 /**
  * This is a JobStoryProducer that operates from distribution of different
  * This is a JobStoryProducer that operates from distribution of different
- * workloads. The .json input file is used to determine how many jobs, which
+ * workloads. The .json input file is used to determine how many weight, which
  * size, number of maps/reducers and their duration, as well as the temporal
  * size, number of maps/reducers and their duration, as well as the temporal
  * distributed of submissions. For each parameter we control avg and stdev, and
  * distributed of submissions. For each parameter we control avg and stdev, and
  * generate values via normal or log-normal distributions.
  * generate values via normal or log-normal distributions.
@@ -55,8 +60,6 @@ public class SynthTraceJobProducer implements JobStoryProducer {
   private final long seed;
   private final long seed;
 
 
   private int totalWeight;
   private int totalWeight;
-  private final List<Double> weightList;
-  private final Map<Integer, SynthWorkload> workloads;
 
 
   private final Queue<StoryParams> listStoryParams;
   private final Queue<StoryParams> listStoryParams;
 
 
@@ -65,6 +68,9 @@ public class SynthTraceJobProducer implements JobStoryProducer {
   public static final String SLS_SYNTHETIC_TRACE_FILE =
   public static final String SLS_SYNTHETIC_TRACE_FILE =
       "sls.synthetic" + ".trace_file";
       "sls.synthetic" + ".trace_file";
 
 
+  private final static int DEFAULT_MAPPER_PRIORITY = 20;
+  private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
   public SynthTraceJobProducer(Configuration conf) throws IOException {
   public SynthTraceJobProducer(Configuration conf) throws IOException {
     this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
     this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
   }
   }
@@ -76,8 +82,6 @@ public class SynthTraceJobProducer implements JobStoryProducer {
 
 
     this.conf = conf;
     this.conf = conf;
     this.rand = new JDKRandomGenerator();
     this.rand = new JDKRandomGenerator();
-    workloads = new HashMap<Integer, SynthWorkload>();
-    weightList = new ArrayList<Double>();
 
 
     ObjectMapper mapper = new ObjectMapper();
     ObjectMapper mapper = new ObjectMapper();
     mapper.configure(INTERN_FIELD_NAMES, true);
     mapper.configure(INTERN_FIELD_NAMES, true);
@@ -86,44 +90,132 @@ public class SynthTraceJobProducer implements JobStoryProducer {
     FileSystem ifs = path.getFileSystem(conf);
     FileSystem ifs = path.getFileSystem(conf);
     FSDataInputStream fileIn = ifs.open(path);
     FSDataInputStream fileIn = ifs.open(path);
 
 
+    // Initialize the random generator and the seed
     this.trace = mapper.readValue(fileIn, Trace.class);
     this.trace = mapper.readValue(fileIn, Trace.class);
-    seed = trace.rand_seed;
-    rand.setSeed(seed);
+    this.seed = trace.rand_seed;
+    this.rand.setSeed(seed);
+    // Initialize the trace
+    this.trace.init(rand);
 
 
     this.numJobs = new AtomicInteger(trace.num_jobs);
     this.numJobs = new AtomicInteger(trace.num_jobs);
 
 
-    for (int workloadId = 0; workloadId < trace.workloads
-        .size(); workloadId++) {
-      SynthWorkload workload = new SynthWorkload(workloadId, trace);
-      for (int classId =
-          0; classId < trace.workloads.get(workloadId).job_classes
-              .size(); classId++) {
-        SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId);
-        workload.add(cls);
-      }
-      workloads.put(workloadId, workload);
+    for (Double w : trace.workload_weights) {
+      totalWeight += w;
     }
     }
 
 
-    for (int i = 0; i < workloads.size(); i++) {
-      double w = workloads.get(i).getWorkloadWeight();
-      totalWeight += w;
-      weightList.add(w);
+    // Initialize our story parameters
+    listStoryParams = createStory();
+
+    LOG.info("Generated " + listStoryParams.size() + " deadlines for "
+        + this.numJobs.get() + " jobs");
+  }
+
+  // StoryParams hold the minimum amount of information needed to completely
+  // specify a job run: job definition, start time, and queue.
+  // This allows us to create "jobs" and then order them according to start time
+  static class StoryParams {
+    // Time the job gets submitted to
+    private long actualSubmissionTime;
+    // The queue the job gets submitted to
+    private String queue;
+    // Definition to construct the job from
+    private JobDefinition jobDef;
+
+    StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) {
+      this.actualSubmissionTime = actualSubmissionTime;
+      this.queue = queue;
+      this.jobDef = jobDef;
     }
     }
+  }
 
 
+
+  private Queue<StoryParams> createStory() {
     // create priority queue to keep start-time sorted
     // create priority queue to keep start-time sorted
-    listStoryParams =
-        new PriorityQueue<StoryParams>(10, new Comparator<StoryParams>() {
+    Queue<StoryParams> storyQueue =
+        new PriorityQueue<>(this.numJobs.get(), new Comparator<StoryParams>() {
           @Override
           @Override
           public int compare(StoryParams o1, StoryParams o2) {
           public int compare(StoryParams o1, StoryParams o2) {
             return Math
             return Math
-                .toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime);
+                .toIntExact(o1.actualSubmissionTime - o2.actualSubmissionTime);
           }
           }
         });
         });
+    for (int i = 0; i < numJobs.get(); i++) {
+      // Generate a workload
+      Workload wl = trace.generateWorkload();
+      // Save all the parameters needed to completely define a job
+      long actualSubmissionTime = wl.generateSubmissionTime();
+      String queue = wl.queue_name;
+      JobDefinition job = wl.generateJobDefinition();
+      storyQueue.add(new StoryParams(actualSubmissionTime, queue, job));
+    }
+    return storyQueue;
+  }
 
 
-    // initialize it
-    createStoryParams();
-    LOG.info("Generated " + listStoryParams.size() + " deadlines for "
-        + this.numJobs.get() + " jobs ");
+  @Override
+  public JobStory getNextJob() throws IOException {
+    if (numJobs.decrementAndGet() < 0) {
+      return null;
+    }
+    StoryParams storyParams = listStoryParams.poll();
+    return new SynthJob(rand, conf, storyParams.jobDef, storyParams.queue,
+        storyParams.actualSubmissionTime);
+  }
+
+  @Override
+  public void close(){
+  }
+
+  @Override
+  public String toString() {
+    return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
+        + ", r=" + rand + ", totalWeight="
+        + totalWeight + ", workloads=" + trace.workloads + "]";
+  }
+
+  public int getNumJobs() {
+    return trace.num_jobs;
+  }
+
+  // Helper to parse and maintain backwards compatibility with
+  // syn json formats
+  private static void validateJobDef(JobDefinition jobDef){
+    if(jobDef.tasks == null) {
+      LOG.info("Detected old JobDefinition format. Converting.");
+      try {
+        jobDef.tasks = new ArrayList<>();
+        jobDef.type = "mapreduce";
+        jobDef.deadline_factor = new Sample(jobDef.deadline_factor_avg,
+            jobDef.deadline_factor_stddev);
+        jobDef.duration = new Sample(jobDef.dur_avg,
+            jobDef.dur_stddev);
+        jobDef.reservation = new Sample(jobDef.chance_of_reservation);
+
+        TaskDefinition map = new TaskDefinition();
+        map.type = MRAMSimulator.MAP_TYPE;
+        map.count = new Sample(jobDef.mtasks_avg, jobDef.mtasks_stddev);
+        map.time = new Sample(jobDef.mtime_avg, jobDef.mtime_stddev);
+        map.max_memory = new Sample((double) jobDef.map_max_memory_avg,
+            jobDef.map_max_memory_stddev);
+        map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg,
+            jobDef.map_max_vcores_stddev);
+        map.priority = DEFAULT_MAPPER_PRIORITY;
+
+        jobDef.tasks.add(map);
+        TaskDefinition reduce = new TaskDefinition();
+        reduce.type = MRAMSimulator.REDUCE_TYPE;
+        reduce.count = new Sample(jobDef.rtasks_avg, jobDef.rtasks_stddev);
+        reduce.time = new Sample(jobDef.rtime_avg, jobDef.rtime_stddev);
+        reduce.max_memory = new Sample((double) jobDef.reduce_max_memory_avg,
+            jobDef.reduce_max_memory_stddev);
+        reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg,
+            jobDef.reduce_max_vcores_stddev);
+        reduce.priority = DEFAULT_REDUCER_PRIORITY;
+
+        jobDef.tasks.add(reduce);
+      } catch (JsonMappingException e) {
+        LOG.warn("Error converting old JobDefinition format", e);
+      }
+    }
   }
   }
 
 
   public long getSeed() {
   public long getSeed() {
@@ -159,6 +251,25 @@ public class SynthTraceJobProducer implements JobStoryProducer {
     @JsonProperty("workloads")
     @JsonProperty("workloads")
     List<Workload> workloads;
     List<Workload> workloads;
 
 
+    List<Double> workload_weights;
+    JDKRandomGenerator rand;
+
+    public void init(JDKRandomGenerator random){
+      this.rand = random;
+      // Pass rand forward
+      for(Workload w : workloads){
+        w.init(rand);
+      }
+      // Initialize workload weights
+      workload_weights = new ArrayList<>();
+      for(Workload w : workloads){
+        workload_weights.add(w.workload_weight);
+      }
+    }
+
+    Workload generateWorkload(){
+      return workloads.get(SynthUtils.getWeighted(workload_weights, rand));
+    }
   }
   }
 
 
   /**
   /**
@@ -174,16 +285,67 @@ public class SynthTraceJobProducer implements JobStoryProducer {
     @JsonProperty("queue_name")
     @JsonProperty("queue_name")
     String queue_name;
     String queue_name;
     @JsonProperty("job_classes")
     @JsonProperty("job_classes")
-    List<JobClass> job_classes;
+    List<JobDefinition> job_classes;
     @JsonProperty("time_distribution")
     @JsonProperty("time_distribution")
     List<TimeSample> time_distribution;
     List<TimeSample> time_distribution;
+
+    JDKRandomGenerator rand;
+
+    List<Double> job_weights;
+    List<Double> time_weights;
+
+    public void init(JDKRandomGenerator random){
+      this.rand = random;
+      // Validate and pass rand forward
+      for(JobDefinition def : job_classes){
+        validateJobDef(def);
+        def.init(rand);
+      }
+
+      // Initialize job weights
+      job_weights = new ArrayList<>();
+      job_weights = new ArrayList<>();
+      for(JobDefinition j : job_classes){
+        job_weights.add(j.class_weight);
+      }
+
+      // Initialize time weights
+      time_weights = new ArrayList<>();
+      for(TimeSample ts : time_distribution){
+        time_weights.add(ts.weight);
+      }
+    }
+
+    public long generateSubmissionTime(){
+      int index = SynthUtils.getWeighted(time_weights, rand);
+      // Retrieve the lower and upper bounds for this time "bucket"
+      int start = time_distribution.get(index).time;
+      // Get the beginning of the next time sample (if it exists)
+      index = (index+1)<time_distribution.size() ? index+1 : index;
+      int end = time_distribution.get(index).time;
+      int range = end-start;
+      // Within this time "bucket", uniformly pick a time if our
+      // range is non-zero, otherwise just use the start time of the bucket
+      return start + (range>0 ? rand.nextInt(range) : 0);
+    }
+
+    public JobDefinition generateJobDefinition(){
+      return job_classes.get(SynthUtils.getWeighted(job_weights, rand));
+    }
+
+    @Override
+    public String toString(){
+      return "\nWorkload " + workload_name + ", weight: " + workload_weight
+          + ", queue: " + queue_name + " "
+          + job_classes.toString().replace("\n", "\n\t");
+    }
   }
   }
 
 
   /**
   /**
    * Class used to parse a job class from file.
    * Class used to parse a job class from file.
    */
    */
   @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
   @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
-  public static class JobClass {
+  public static class JobDefinition {
 
 
     @JsonProperty("class_name")
     @JsonProperty("class_name")
     String class_name;
     String class_name;
@@ -194,6 +356,23 @@ public class SynthTraceJobProducer implements JobStoryProducer {
     @JsonProperty("class_weight")
     @JsonProperty("class_weight")
     double class_weight;
     double class_weight;
 
 
+    // am type to launch
+    @JsonProperty("type")
+    String type;
+    @JsonProperty("deadline_factor")
+    Sample deadline_factor;
+    @JsonProperty("duration")
+    Sample duration;
+    @JsonProperty("reservation")
+    Sample reservation;
+
+    @JsonProperty("tasks")
+    List<TaskDefinition> tasks;
+
+    @JsonProperty("params")
+    Map<String, String> params;
+
+    // Old JSON fields for backwards compatibility
     // reservation related params
     // reservation related params
     @JsonProperty("chance_of_reservation")
     @JsonProperty("chance_of_reservation")
     double chance_of_reservation;
     double chance_of_reservation;
@@ -246,71 +425,227 @@ public class SynthTraceJobProducer implements JobStoryProducer {
     @JsonProperty("reduce_max_vcores_stddev")
     @JsonProperty("reduce_max_vcores_stddev")
     double reduce_max_vcores_stddev;
     double reduce_max_vcores_stddev;
 
 
+    public void init(JDKRandomGenerator rand){
+      deadline_factor.init(rand);
+      duration.init(rand);
+      reservation.init(rand);
+
+      for(TaskDefinition t : tasks){
+        t.count.init(rand);
+        t.time.init(rand);
+        t.max_memory.init(rand);
+        t.max_vcores.init(rand);
+      }
+    }
+
+    @Override
+    public String toString(){
+      return "\nJobDefinition " + class_name + ", weight: " + class_weight
+          + ", type: " + type + " "
+          + tasks.toString().replace("\n", "\n\t");
+    }
   }
   }
 
 
   /**
   /**
-   * This is used to define time-varying probability of a job start-time (e.g.,
-   * to simulate daily patterns).
+   * A task representing a type of container - e.g. "map" in mapreduce
    */
    */
   @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
   @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
-  public static class TimeSample {
-    // in sec
+  public static class TaskDefinition {
+
+    @JsonProperty("type")
+    String type;
+    @JsonProperty("count")
+    Sample count;
     @JsonProperty("time")
     @JsonProperty("time")
-    int time;
-    @JsonProperty("weight")
-    double jobs;
+    Sample time;
+    @JsonProperty("max_memory")
+    Sample max_memory;
+    @JsonProperty("max_vcores")
+    Sample max_vcores;
+    @JsonProperty("priority")
+    int priority;
+
+    @Override
+    public String toString(){
+      return "\nTaskDefinition " + type
+          + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory
+          + "] Vcores[" + max_vcores + "] Priority[" + priority + "]";
+    }
   }
   }
 
 
-  static class StoryParams {
-    private SynthJobClass pickedJobClass;
-    private long actualSubmissionTime;
+  /**
+   * Class used to parse value sample information.
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class Sample {
+    private static final Dist DEFAULT_DIST = Dist.LOGNORM;
+
+    private final double val;
+    private final double std;
+    private final Dist dist;
+    private AbstractRealDistribution dist_instance;
+    private final List<String> discrete;
+    private final List<Double> weights;
+    private final Mode mode;
+
+    private JDKRandomGenerator rand;
+
+    private enum Mode{
+      CONST,
+      DIST,
+      DISC
+    }
 
 
-    StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) {
-      this.pickedJobClass = pickedJobClass;
-      this.actualSubmissionTime = actualSubmissionTime;
+    private enum Dist{
+      LOGNORM,
+      NORM
     }
     }
-  }
 
 
+    public Sample(Double val) throws JsonMappingException{
+      this(val, null);
+    }
 
 
-  void createStoryParams() {
+    public Sample(Double val, Double std) throws JsonMappingException{
+      this(val, std, null, null, null);
+    }
 
 
-    for (int i = 0; i < numJobs.get(); i++) {
-      int workload = SynthUtils.getWeighted(weightList, rand);
-      SynthWorkload pickedWorkload = workloads.get(workload);
-      long jobClass =
-          SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand);
-      SynthJobClass pickedJobClass =
-          pickedWorkload.getClassList().get((int) jobClass);
-      long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand);
-      // long actualSubmissionTime = (i + 1) * 10;
-      listStoryParams
-          .add(new StoryParams(pickedJobClass, actualSubmissionTime));
+    @JsonCreator
+    public Sample(@JsonProperty("val") Double val,
+        @JsonProperty("std") Double std, @JsonProperty("dist") String dist,
+        @JsonProperty("discrete") List<String> discrete,
+        @JsonProperty("weights") List<Double> weights)
+        throws JsonMappingException{
+      // Different Modes
+      // - Constant: val must be specified, all else null. Sampling will
+      // return val.
+      // - Distribution: val, std specified, dist optional (defaults to
+      // LogNormal). Sampling will sample from the appropriate distribution
+      // - Discrete: discrete must be set to a list of strings or numbers,
+      // weights optional (defaults to uniform)
+
+      if(val!=null){
+        if(std==null){
+          // Constant
+          if(dist!=null || discrete!=null || weights!=null){
+            throw new JsonMappingException("Instantiation of " + Sample.class
+                + " failed");
+          }
+          mode = Mode.CONST;
+          this.val = val;
+          this.std = 0;
+          this.dist = null;
+          this.discrete = null;
+          this.weights = null;
+        } else {
+          // Distribution
+          if(discrete!=null || weights != null){
+            throw new JsonMappingException("Instantiation of " + Sample.class
+                + " failed");
+          }
+          mode = Mode.DIST;
+          this.val = val;
+          this.std = std;
+          this.dist = dist!=null ? Dist.valueOf(dist) : DEFAULT_DIST;
+          this.discrete = null;
+          this.weights = null;
+        }
+      } else {
+        // Discrete
+        if(discrete==null){
+          throw new JsonMappingException("Instantiation of " + Sample.class
+              + " failed");
+        }
+        mode = Mode.DISC;
+        this.val = 0;
+        this.std = 0;
+        this.dist = null;
+        this.discrete = discrete;
+        if(weights == null){
+          weights = new ArrayList<>(Collections.nCopies(
+              discrete.size(), 1.0));
+        }
+        if(weights.size() != discrete.size()){
+          throw new JsonMappingException("Instantiation of " + Sample.class
+              + " failed");
+        }
+        this.weights = weights;
+      }
     }
     }
-  }
 
 
-  @Override
-  public JobStory getNextJob() throws IOException {
-    if (numJobs.decrementAndGet() < 0) {
-      return null;
+    public void init(JDKRandomGenerator random){
+      if(this.rand != null){
+        throw new YarnRuntimeException("init called twice");
+      }
+      this.rand = random;
+      if(mode == Mode.DIST){
+        switch(this.dist){
+        case LOGNORM:
+          this.dist_instance = SynthUtils.getLogNormalDist(rand, val, std);
+          return;
+        case NORM:
+          this.dist_instance = SynthUtils.getNormalDist(rand, val, std);
+          return;
+        default:
+          throw new YarnRuntimeException("Unknown distribution " + dist.name());
+        }
+      }
     }
     }
-    StoryParams storyParams = listStoryParams.poll();
-    return storyParams.pickedJobClass.getJobStory(conf,
-        storyParams.actualSubmissionTime);
-  }
 
 
-  @Override
-  public void close() {
-  }
+    public int getInt(){
+      return Math.toIntExact(getLong());
+    }
 
 
-  @Override
-  public String toString() {
-    return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
-        + ", weightList=" + weightList + ", r=" + rand + ", totalWeight="
-        + totalWeight + ", workloads=" + workloads + "]";
-  }
+    public long getLong(){
+      return Math.round(getDouble());
+    }
+
+    public double getDouble(){
+      return Double.parseDouble(getString());
+    }
+
+    public String getString(){
+      if(this.rand == null){
+        throw new YarnRuntimeException("getValue called without init");
+      }
+      switch(mode){
+      case CONST:
+        return Double.toString(val);
+      case DIST:
+        return Double.toString(dist_instance.sample());
+      case DISC:
+        return this.discrete.get(SynthUtils.getWeighted(this.weights, rand));
+      default:
+        throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
+      }
+    }
+
+    @Override
+    public String toString(){
+      switch(mode){
+      case CONST:
+        return "value: " + Double.toString(val);
+      case DIST:
+        return "value: " + this.val + " std: " + this.std + " dist: "
+            + this.dist.name();
+      case DISC:
+        return "discrete: " + this.discrete + ", weights: " + this.weights;
+      default:
+        throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
+      }
+    }
 
 
-  public int getNumJobs() {
-    return trace.num_jobs;
   }
   }
 
 
+  /**
+   * This is used to define time-varying probability of a job start-time (e.g.,
+   * to simulate daily patterns).
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class TimeSample {
+    // in sec
+    @JsonProperty("time")
+    int time;
+    @JsonProperty("weight")
+    double weight;
+  }
 }
 }

+ 0 - 121
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java

@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.synthetic;
-
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
-
-import java.util.*;
-
-/**
- * This class represent a workload (made up of multiple SynthJobClass(es)). It
- * also stores the temporal distributions of jobs in this workload.
- */
-public class SynthWorkload {
-
-  private final int id;
-  private final List<SynthJobClass> classList;
-  private final Trace trace;
-  private final SortedMap<Integer, Double> timeWeights;
-
-  public SynthWorkload(int identifier, Trace inTrace) {
-    classList = new ArrayList<SynthJobClass>();
-    this.id = identifier;
-    this.trace = inTrace;
-    timeWeights = new TreeMap<Integer, Double>();
-    for (SynthTraceJobProducer.TimeSample ts : trace.workloads
-        .get(id).time_distribution) {
-      timeWeights.put(ts.time, ts.jobs);
-    }
-  }
-
-  public boolean add(SynthJobClass s) {
-    return classList.add(s);
-  }
-
-  public List<Double> getWeightList() {
-    ArrayList<Double> ret = new ArrayList<Double>();
-    for (SynthJobClass s : classList) {
-      ret.add(s.getClassWeight());
-    }
-    return ret;
-  }
-
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof SynthWorkload)) {
-      return false;
-    }
-    // assume ID determines job classes by construction
-    return getId() == ((SynthWorkload) other).getId();
-  }
-
-  @Override
-  public int hashCode() {
-    return getId();
-  }
-
-  @Override
-  public String toString() {
-    return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n"
-        + classList + "]\n";
-  }
-
-  public String getName() {
-    return trace.workloads.get(id).workload_name;
-  }
-
-  public double getWorkloadWeight() {
-    return trace.workloads.get(id).workload_weight;
-  }
-
-  public String getQueueName() {
-    return trace.workloads.get(id).queue_name;
-  }
-
-  public long getBaseSubmissionTime(Random rand) {
-
-    // pick based on weights the "bucket" for this start time
-    int position = SynthUtils.getWeighted(timeWeights.values(), rand);
-
-    int[] time = new int[timeWeights.keySet().size()];
-    int index = 0;
-    for (Integer i : timeWeights.keySet()) {
-      time[index++] = i;
-    }
-
-    // uniformly pick a time between start and end time of this bucket
-    int startRange = time[position];
-    int endRange = startRange;
-    // if there is no subsequent bucket pick startRange
-    if (position < timeWeights.keySet().size() - 1) {
-      endRange = time[position + 1];
-      return startRange + rand.nextInt((endRange - startRange));
-    } else {
-      return startRange;
-    }
-  }
-
-  public List<SynthJobClass> getClassList() {
-    return classList;
-  }
-
-}

+ 1 - 1
hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java

@@ -125,7 +125,7 @@ public abstract class BaseSLSRunnerTest {
       if (!exceptionList.isEmpty()) {
       if (!exceptionList.isEmpty()) {
         sls.stop();
         sls.stop();
         Assert.fail("TestSLSRunner catched exception from child thread "
         Assert.fail("TestSLSRunner catched exception from child thread "
-            + "(TaskRunner.Task): " + exceptionList);
+            + "(TaskRunner.TaskDefinition): " + exceptionList);
         break;
         break;
       }
       }
       timeout--;
       timeout--;

+ 76 - 0
hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSGenericSynth extends BaseSLSRunnerTest {
+
+  @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+  public static Collection<Object[]> data() {
+
+    String capScheduler = CapacityScheduler.class.getCanonicalName();
+    String fairScheduler = FairScheduler.class.getCanonicalName();
+    String synthTraceFile = "src/test/resources/syn_generic.json";
+    String nodeFile = "src/test/resources/nodes.json";
+
+    // Test with both schedulers
+    return Arrays.asList(new Object[][] {
+
+        // covering the no nodeFile case
+        {capScheduler, "SYNTH", synthTraceFile, null },
+
+        // covering new commandline and CapacityScheduler
+        {capScheduler, "SYNTH", synthTraceFile, nodeFile },
+
+        // covering FairScheduler
+        {fairScheduler, "SYNTH", synthTraceFile, nodeFile },
+    });
+  }
+
+  @Before
+  public void setup() {
+    ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+    exitInvariantFile = "src/test/resources/exit-invariants.txt";
+  }
+
+  @Test(timeout = 90000)
+  @SuppressWarnings("all")
+  public void testSimulatorRunning() throws Exception {
+    Configuration conf = new Configuration(false);
+    long timeTillShutdownInsec = 20L;
+    runSLS(conf, timeTillShutdownInsec);
+  }
+}

+ 76 - 0
hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSStreamAMSynth extends BaseSLSRunnerTest {
+
+  @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+  public static Collection<Object[]> data() {
+
+    String capScheduler = CapacityScheduler.class.getCanonicalName();
+    String fairScheduler = FairScheduler.class.getCanonicalName();
+    String synthTraceFile = "src/test/resources/syn_stream.json";
+    String nodeFile = "src/test/resources/nodes.json";
+
+    // Test with both schedulers
+    return Arrays.asList(new Object[][] {
+
+        // covering the no nodeFile case
+        {capScheduler, "SYNTH", synthTraceFile, null },
+
+        // covering new commandline and CapacityScheduler
+        {capScheduler, "SYNTH", synthTraceFile, nodeFile },
+
+        // covering FairScheduler
+        {fairScheduler, "SYNTH", synthTraceFile, nodeFile },
+    });
+  }
+
+  @Before
+  public void setup() {
+    ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+    exitInvariantFile = "src/test/resources/exit-invariants.txt";
+  }
+
+  @Test(timeout = 90000)
+  @SuppressWarnings("all")
+  public void testSimulatorRunning() throws Exception {
+    Configuration conf = new Configuration(false);
+    long timeTillShutdownInsec = 20L;
+    runSLS(conf, timeTillShutdownInsec);
+  }
+}

+ 188 - 25
hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java

@@ -17,20 +17,25 @@
  */
  */
 package org.apache.hadoop.yarn.sls;
 package org.apache.hadoop.yarn.sls;
 
 
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
 import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
 import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
 import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.Arrays;
 
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 
 
+import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES;
+import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES;
+
 /**
 /**
  * Simple test class driving the {@code SynthTraceJobProducer}, and validating
  * Simple test class driving the {@code SynthTraceJobProducer}, and validating
  * jobs produce are within expected range.
  * jobs produce are within expected range.
@@ -38,10 +43,60 @@ import static org.junit.Assert.assertTrue;
 public class TestSynthJobGeneration {
 public class TestSynthJobGeneration {
 
 
   public final static Logger LOG =
   public final static Logger LOG =
-      Logger.getLogger(TestSynthJobGeneration.class);
+      LoggerFactory.getLogger(TestSynthJobGeneration.class);
 
 
   @Test
   @Test
-  public void test() throws IllegalArgumentException, IOException {
+  public void testWorkloadGenerateTime()
+      throws IllegalArgumentException, IOException {
+
+    String workloadJson = "{\"job_classes\": [], \"time_distribution\":["
+        + "{\"time\": 0, \"weight\": 1}, " + "{\"time\": 30, \"weight\": 0},"
+        + "{\"time\": 60, \"weight\": 2}," + "{\"time\": 90, \"weight\": 1}"
+        + "]}";
+
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(INTERN_FIELD_NAMES, true);
+    mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
+    SynthTraceJobProducer.Workload wl =
+        mapper.readValue(workloadJson, SynthTraceJobProducer.Workload.class);
+
+    JDKRandomGenerator rand = new JDKRandomGenerator();
+    rand.setSeed(0);
+
+    wl.init(rand);
+
+    int bucket0 = 0;
+    int bucket1 = 0;
+    int bucket2 = 0;
+    int bucket3 = 0;
+    for (int i = 0; i < 1000; ++i) {
+      long time = wl.generateSubmissionTime();
+      LOG.info("Generated time " + time);
+      if (time < 30) {
+        bucket0++;
+      } else if (time < 60) {
+        bucket1++;
+      } else if (time < 90) {
+        bucket2++;
+      } else {
+        bucket3++;
+      }
+    }
+
+    Assert.assertTrue(bucket0 > 0);
+    Assert.assertTrue(bucket1 == 0);
+    Assert.assertTrue(bucket2 > 0);
+    Assert.assertTrue(bucket3 > 0);
+    Assert.assertTrue(bucket2 > bucket0);
+    Assert.assertTrue(bucket2 > bucket3);
+
+    LOG.info("bucket0 {}, bucket1 {}, bucket2 {}, bucket3 {}", bucket0, bucket1,
+        bucket2, bucket3);
+
+  }
+
+  @Test
+  public void testMapReduce() throws IllegalArgumentException, IOException {
 
 
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
 
 
@@ -50,47 +105,155 @@ public class TestSynthJobGeneration {
 
 
     SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf);
     SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf);
 
 
+    LOG.info(stjp.toString());
+
     SynthJob js = (SynthJob) stjp.getNextJob();
     SynthJob js = (SynthJob) stjp.getNextJob();
 
 
     int jobCount = 0;
     int jobCount = 0;
 
 
     while (js != null) {
     while (js != null) {
-      LOG.info((jobCount++) + " " + js.getQueueName() + " -- "
-          + js.getJobClass().getClassName() + " (conf: "
-          + js.getJobConf().get(MRJobConfig.QUEUE_NAME) + ") " + " submission: "
-          + js.getSubmissionTime() + ", " + " duration: " + js.getDuration()
-          + " numMaps: " + js.getNumberMaps() + " numReduces: "
-          + js.getNumberReduces());
+      LOG.info(js.toString());
+      validateJob(js);
+      js = (SynthJob) stjp.getNextJob();
+      jobCount++;
+    }
 
 
+    Assert.assertEquals(stjp.getNumJobs(), jobCount);
+  }
+
+  @Test
+  public void testGeneric() throws IllegalArgumentException, IOException {
+    Configuration conf = new Configuration();
+
+    conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE,
+        "src/test/resources/syn_generic.json");
+
+    SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf);
+
+    LOG.info(stjp.toString());
+
+    SynthJob js = (SynthJob) stjp.getNextJob();
+
+    int jobCount = 0;
+
+    while (js != null) {
+      LOG.info(js.toString());
       validateJob(js);
       validateJob(js);
       js = (SynthJob) stjp.getNextJob();
       js = (SynthJob) stjp.getNextJob();
+      jobCount++;
     }
     }
 
 
     Assert.assertEquals(stjp.getNumJobs(), jobCount);
     Assert.assertEquals(stjp.getNumJobs(), jobCount);
   }
   }
 
 
-  private void validateJob(SynthJob js) {
+  @Test
+  public void testStream() throws IllegalArgumentException, IOException {
+    Configuration conf = new Configuration();
 
 
-    assertTrue(js.getSubmissionTime() > 0);
-    assertTrue(js.getDuration() > 0);
-    assertTrue(js.getNumberMaps() >= 0);
-    assertTrue(js.getNumberReduces() >= 0);
-    assertTrue(js.getNumberMaps() + js.getNumberReduces() > 0);
-    assertTrue(js.getTotalSlotTime() >= 0);
+    conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE,
+        "src/test/resources/syn_stream.json");
+
+    SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf);
+
+    LOG.info(stjp.toString());
+
+    SynthJob js = (SynthJob) stjp.getNextJob();
+
+    int jobCount = 0;
+
+    while (js != null) {
+      LOG.info(js.toString());
+      validateJob(js);
+      js = (SynthJob) stjp.getNextJob();
+      jobCount++;
+    }
+
+    Assert.assertEquals(stjp.getNumJobs(), jobCount);
+  }
 
 
-    for (int i = 0; i < js.getNumberMaps(); i++) {
-      TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.MAP, i, 0);
-      assertTrue(tai.getRuntime() > 0);
+  @Test
+  public void testSample() throws IOException {
+
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(INTERN_FIELD_NAMES, true);
+    mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+    JDKRandomGenerator rand = new JDKRandomGenerator();
+    rand.setSeed(0);
+
+    String valJson = "{\"val\" : 5 }";
+    SynthTraceJobProducer.Sample valSample =
+        mapper.readValue(valJson, SynthTraceJobProducer.Sample.class);
+    valSample.init(rand);
+    int val = valSample.getInt();
+    Assert.assertEquals(5, val);
+
+    String distJson = "{\"val\" : 5, \"std\" : 1 }";
+    SynthTraceJobProducer.Sample distSample =
+        mapper.readValue(distJson, SynthTraceJobProducer.Sample.class);
+    distSample.init(rand);
+    double dist = distSample.getDouble();
+    Assert.assertTrue(dist > 2 && dist < 8);
+
+    String normdistJson = "{\"val\" : 5, \"std\" : 1, \"dist\": \"NORM\" }";
+    SynthTraceJobProducer.Sample normdistSample =
+        mapper.readValue(normdistJson, SynthTraceJobProducer.Sample.class);
+    normdistSample.init(rand);
+    double normdist = normdistSample.getDouble();
+    Assert.assertTrue(normdist > 2 && normdist < 8);
+
+    String discreteJson = "{\"discrete\" : [2, 4, 6, 8]}";
+    SynthTraceJobProducer.Sample discreteSample =
+        mapper.readValue(discreteJson, SynthTraceJobProducer.Sample.class);
+    discreteSample.init(rand);
+    int discrete = discreteSample.getInt();
+    Assert.assertTrue(
+        Arrays.asList(new Integer[] {2, 4, 6, 8}).contains(discrete));
+
+    String discreteWeightsJson =
+        "{\"discrete\" : [2, 4, 6, 8], " + "\"weights\": [0, 0, 0, 1]}";
+    SynthTraceJobProducer.Sample discreteWeightsSample = mapper
+        .readValue(discreteWeightsJson, SynthTraceJobProducer.Sample.class);
+    discreteWeightsSample.init(rand);
+    int discreteWeights = discreteWeightsSample.getInt();
+    Assert.assertEquals(8, discreteWeights);
+
+    String invalidJson = "{\"val\" : 5, \"discrete\" : [2, 4, 6, 8], "
+        + "\"weights\": [0, 0, 0, 1]}";
+    try {
+      mapper.readValue(invalidJson, SynthTraceJobProducer.Sample.class);
+      Assert.fail();
+    } catch (JsonMappingException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Instantiation of"));
     }
     }
 
 
-    for (int i = 0; i < js.getNumberReduces(); i++) {
-      TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
-      assertTrue(tai.getRuntime() > 0);
+    String invalidDistJson =
+        "{\"val\" : 5, \"std\" : 1, " + "\"dist\": \"INVALID\" }";
+    try {
+      mapper.readValue(invalidDistJson, SynthTraceJobProducer.Sample.class);
+      Assert.fail();
+    } catch (JsonMappingException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Instantiation of"));
     }
     }
+  }
+
+  private void validateJob(SynthJob js) {
+
+    assertTrue(js.getSubmissionTime() > 0);
+    assertTrue(js.getDuration() > 0);
+    assertTrue(js.getTotalSlotTime() >= 0);
 
 
     if (js.hasDeadline()) {
     if (js.hasDeadline()) {
       assertTrue(js.getDeadline() > js.getSubmissionTime() + js.getDuration());
       assertTrue(js.getDeadline() > js.getSubmissionTime() + js.getDuration());
     }
     }
 
 
+    assertTrue(js.getTasks().size() > 0);
+
+    for (SynthJob.SynthTask t : js.getTasks()) {
+      assertTrue(t.getType() != null);
+      assertTrue(t.getTime() > 0);
+      assertTrue(t.getMemory() > 0);
+      assertTrue(t.getVcores() > 0);
+    }
   }
   }
 }
 }

+ 1 - 1
hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java

@@ -139,7 +139,7 @@ public class TestAMSimulator {
     String queue = "default";
     String queue = "default";
     List<ContainerSimulator> containers = new ArrayList<>();
     List<ContainerSimulator> containers = new ArrayList<>();
     app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
     app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
-        appId, 0, SLSConfiguration.getAMContainerResource(conf));
+        appId, 0, SLSConfiguration.getAMContainerResource(conf), null);
     app.firstStep();
     app.firstStep();
 
 
     verifySchedulerMetrics(appId);
     verifySchedulerMetrics(appId);

+ 4 - 0
hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml

@@ -45,6 +45,10 @@
     <name>yarn.sls.am.type.mapreduce</name>
     <name>yarn.sls.am.type.mapreduce</name>
     <value>org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator</value>
     <value>org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator</value>
   </property>
   </property>
+  <property>
+    <name>yarn.sls.am.type.stream</name>
+    <value>org.apache.hadoop.yarn.sls.appmaster.StreamAMSimulator</value>
+  </property>
 
 
   <!-- Containers configuration -->
   <!-- Containers configuration -->
   <property>
   <property>

+ 1 - 1
hadoop-tools/hadoop-sls/src/test/resources/syn.json

@@ -45,7 +45,7 @@
         },
         },
         {
         {
           "time": 60,
           "time": 60,
-          "jobs": 0
+          "weight": 0
         }
         }
       ]
       ]
     }
     }

+ 54 - 0
hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json

@@ -0,0 +1,54 @@
+{
+  "description": "tiny jobs workload",
+  "num_nodes": 20,
+  "nodes_per_rack": 4,
+  "num_jobs": 10,
+  "rand_seed": 2,
+  "workloads": [
+    {
+      "workload_name": "tiny-test",
+      "workload_weight": 0.5,
+      "description": "Sort jobs",
+      "queue_name": "sls_queue_1",
+      "job_classes": [
+        {
+          "class_name": "class_1",
+          "user_name": "foobar",
+          "class_weight": 1.0,
+          "type": "mapreduce",
+          "deadline_factor": {"val": 10},
+          "duration": {"val": 60, "std": 5},
+          "reservation": {"val": 0.5},
+          "tasks":[
+            {
+              "type": "map",
+              "priority": 20,
+              "count": { "val": 5, "std": 1},
+              "time": {"val": 10, "std": 2},
+              "max_memory": {"val": 1024},
+              "max_vcores": {"val": 1}
+            },
+            {
+              "type": "reduce",
+              "priority": 10,
+              "count": { "val": 5, "std": 1},
+              "time": {"val": 20, "std": 4},
+              "max_memory": {"val": 2048},
+              "max_vcores": {"val": 2}
+            }
+          ]
+        }
+      ],
+      "time_distribution": [
+        {
+          "time": 1,
+          "weight": 100
+        },
+        {
+          "time": 60,
+          "weight": 0
+        }
+      ]
+    }
+  ]
+}

+ 46 - 0
hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json

@@ -0,0 +1,46 @@
+{
+  "description": "stream workload",
+  "num_nodes": 20,
+  "nodes_per_rack": 4,
+  "num_jobs": 5,
+  "rand_seed": 2,
+  "workloads": [
+    {
+      "workload_name": "tiny-test",
+      "workload_weight": 1,
+      "description": "long lived streaming jobs",
+      "queue_name": "sls_queue_1",
+      "job_classes": [
+        {
+          "class_name": "class_1",
+          "user_name": "foobar",
+          "class_weight": 1.0,
+          "type": "stream",
+          "deadline_factor": {"val": 10},
+          "duration": {"val": 30, "std": 5},
+          "reservation": {"val": 0.5},
+          "tasks":[
+            {
+              "type": "stream",
+              "priority": 20,
+              "count": { "val": 2},
+              "time": {"val": 60000},
+              "max_memory": {"val": 4096},
+              "max_vcores": {"val": 4}
+            }
+          ]
+        }
+      ],
+      "time_distribution": [
+        {
+          "time": 1,
+          "weight": 100
+        },
+        {
+          "time": 2,
+          "weight": 0
+        }
+      ]
+    }
+  ]
+}