|
@@ -36,8 +36,8 @@ import java.util.concurrent.locks.Condition;
|
|
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
|
|
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
|
|
|
|
|
|
- private LoadStatus loadStatus = new LoadStatus();
|
|
|
|
- private final Condition overloaded = this.lock.newCondition();
|
|
|
|
|
|
+ private final LoadStatus loadStatus = new LoadStatus();
|
|
|
|
+ private final Condition condUnderloaded = this.lock.newCondition();
|
|
/**
|
|
/**
|
|
* The minimum ratio between pending+running map tasks (aka. incomplete map
|
|
* The minimum ratio between pending+running map tasks (aka. incomplete map
|
|
* tasks) and cluster map slot capacity for us to consider the cluster is
|
|
* tasks) and cluster map slot capacity for us to consider the cluster is
|
|
@@ -46,6 +46,27 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
*/
|
|
*/
|
|
static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
|
|
static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * The minimum ratio between pending+running reduce tasks (aka. incomplete
|
|
|
|
+ * reduce tasks) and cluster reduce slot capacity for us to consider the
|
|
|
|
+ * cluster is overloaded. For running reduces, we only count them partially.
|
|
|
|
+ * Namely, a 40% completed reduce is counted as 0.6 reduce tasks in our
|
|
|
|
+ * calculation.
|
|
|
|
+ */
|
|
|
|
+ static final float OVERLOAD_REDUCETASK_REDUCESLOT_RATIO = 2.5f;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The maximum share of the cluster's mapslot capacity that can be counted
|
|
|
|
+ * toward a job's incomplete map tasks in overload calculation.
|
|
|
|
+ */
|
|
|
|
+ static final float MAX_MAPSLOT_SHARE_PER_JOB=0.1f;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * The maximum share of the cluster's reduceslot capacity that can be counted
|
|
|
|
+ * toward a job's incomplete reduce tasks in overload calculation.
|
|
|
|
+ */
|
|
|
|
+ static final float MAX_REDUCESLOT_SHARE_PER_JOB=0.1f;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Creating a new instance does not start the thread.
|
|
* Creating a new instance does not start the thread.
|
|
*
|
|
*
|
|
@@ -63,11 +84,6 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
throws IOException {
|
|
throws IOException {
|
|
super(
|
|
super(
|
|
submitter, jobProducer, scratch, conf, startFlag, resolver);
|
|
submitter, jobProducer, scratch, conf, startFlag, resolver);
|
|
-
|
|
|
|
- //Setting isOverloaded as true , now JF would wait for atleast first
|
|
|
|
- //set of ClusterStats based on which it can decide how many job it has
|
|
|
|
- //to submit.
|
|
|
|
- this.loadStatus.isOverloaded = true;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public Thread createReaderThread() {
|
|
public Thread createReaderThread() {
|
|
@@ -104,33 +120,37 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
lock.lock();
|
|
lock.lock();
|
|
try {
|
|
try {
|
|
- while (loadStatus.isOverloaded) {
|
|
|
|
|
|
+ while (loadStatus.overloaded()) {
|
|
//Wait while JT is overloaded.
|
|
//Wait while JT is overloaded.
|
|
try {
|
|
try {
|
|
- overloaded.await();
|
|
|
|
|
|
+ condUnderloaded.await();
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
|
|
|
|
- LOG.info("No of slots to be backfilled are " + noOfSlotsAvailable);
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < noOfSlotsAvailable; i++) {
|
|
|
|
|
|
+ while (!loadStatus.overloaded()) {
|
|
try {
|
|
try {
|
|
final JobStory job = getNextJobFiltered();
|
|
final JobStory job = getNextJobFiltered();
|
|
if (null == job) {
|
|
if (null == job) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- //TODO: We need to take care of scenario when one map takes more
|
|
|
|
- //than 1 slot.
|
|
|
|
- i += job.getNumberMaps();
|
|
|
|
-
|
|
|
|
|
|
+
|
|
submitter.add(
|
|
submitter.add(
|
|
jobCreator.createGridmixJob(
|
|
jobCreator.createGridmixJob(
|
|
conf, 0L, job, scratch, userResolver.getTargetUgi(
|
|
conf, 0L, job, scratch, userResolver.getTargetUgi(
|
|
UserGroupInformation.createRemoteUser(
|
|
UserGroupInformation.createRemoteUser(
|
|
job.getUser())), sequence.getAndIncrement()));
|
|
job.getUser())), sequence.getAndIncrement()));
|
|
|
|
+ // TODO: We need to take care of scenario when one map/reduce
|
|
|
|
+ // takes more than 1 slot.
|
|
|
|
+ loadStatus.mapSlotsBackfill -=
|
|
|
|
+ calcEffectiveIncompleteMapTasks(
|
|
|
|
+ loadStatus.mapSlotCapacity, job.getNumberMaps(), 0.0f);
|
|
|
|
+ loadStatus.reduceSlotsBackfill -=
|
|
|
|
+ calcEffectiveIncompleteReduceTasks(
|
|
|
|
+ loadStatus.reduceSlotCapacity, job.getNumberReduces(),
|
|
|
|
+ 0.0f);
|
|
|
|
+ --loadStatus.numJobsBackfill;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Error while submitting the job ", e);
|
|
LOG.error("Error while submitting the job ", e);
|
|
error = e;
|
|
error = e;
|
|
@@ -166,12 +186,33 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Couldn't get the new Status",e);
|
|
LOG.error("Couldn't get the new Status",e);
|
|
}
|
|
}
|
|
- overloaded.signalAll();
|
|
|
|
|
|
+ if (!loadStatus.overloaded()) {
|
|
|
|
+ condUnderloaded.signalAll();
|
|
|
|
+ }
|
|
} finally {
|
|
} finally {
|
|
lock.unlock();
|
|
lock.unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ static float calcEffectiveIncompleteMapTasks(int mapSlotCapacity,
|
|
|
|
+ int numMaps, float mapProgress) {
|
|
|
|
+ float maxEffIncompleteMapTasks = Math.max(1.0f, mapSlotCapacity
|
|
|
|
+ * MAX_MAPSLOT_SHARE_PER_JOB);
|
|
|
|
+ float mapProgressAdjusted = Math.max(Math.min(mapProgress, 1.0f), 0.0f);
|
|
|
|
+ return Math.min(maxEffIncompleteMapTasks, numMaps
|
|
|
|
+ * (1.0f - mapProgressAdjusted));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static float calcEffectiveIncompleteReduceTasks(int reduceSlotCapacity,
|
|
|
|
+ int numReduces, float reduceProgress) {
|
|
|
|
+ float maxEffIncompleteReduceTasks = Math.max(1.0f, reduceSlotCapacity
|
|
|
|
+ * MAX_REDUCESLOT_SHARE_PER_JOB);
|
|
|
|
+ float reduceProgressAdjusted = Math.max(Math.min(reduceProgress, 1.0f),
|
|
|
|
+ 0.0f);
|
|
|
|
+ return Math.min(maxEffIncompleteReduceTasks, numReduces
|
|
|
|
+ * (1.0f - reduceProgressAdjusted));
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* We try to use some light-weight mechanism to determine cluster load.
|
|
* We try to use some light-weight mechanism to determine cluster load.
|
|
*
|
|
*
|
|
@@ -181,62 +222,98 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
|
|
*/
|
|
*/
|
|
private void checkLoadAndGetSlotsToBackfill(
|
|
private void checkLoadAndGetSlotsToBackfill(
|
|
ClusterStats stats, ClusterStatus clusterStatus) throws IOException {
|
|
ClusterStats stats, ClusterStatus clusterStatus) throws IOException {
|
|
- // If there are more jobs than number of task trackers, we assume the
|
|
|
|
- // cluster is overloaded.
|
|
|
|
- if (stats.getNumRunningJob() >= clusterStatus.getTaskTrackers()) {
|
|
|
|
|
|
+ loadStatus.mapSlotCapacity = clusterStatus.getMaxMapTasks();
|
|
|
|
+ loadStatus.reduceSlotCapacity = clusterStatus.getMaxReduceTasks();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ loadStatus.numJobsBackfill = clusterStatus.getTaskTrackers()
|
|
|
|
+ - stats.getNumRunningJob();
|
|
|
|
+ if (loadStatus.numJobsBackfill <= 0) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- LOG.debug(
|
|
|
|
- System.currentTimeMillis() + " Overloaded is " +
|
|
|
|
- Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
|
|
|
|
- stats.getNumRunningJob() + " >= " +
|
|
|
|
- clusterStatus.getTaskTrackers() + " )\n");
|
|
|
|
|
|
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
|
|
|
|
+ + Boolean.TRUE.toString() + " NumJobsBackfill is "
|
|
|
|
+ + loadStatus.numJobsBackfill);
|
|
}
|
|
}
|
|
- loadStatus.isOverloaded = true;
|
|
|
|
- loadStatus.numSlotsBackfill = 0;
|
|
|
|
- return;
|
|
|
|
|
|
+ return; // stop calculation because we know it is overloaded.
|
|
}
|
|
}
|
|
|
|
|
|
float incompleteMapTasks = 0; // include pending & running map tasks.
|
|
float incompleteMapTasks = 0; // include pending & running map tasks.
|
|
for (JobStats job : ClusterStats.getRunningJobStats()) {
|
|
for (JobStats job : ClusterStats.getRunningJobStats()) {
|
|
float mapProgress = job.getJob().mapProgress();
|
|
float mapProgress = job.getJob().mapProgress();
|
|
int noOfMaps = job.getNoOfMaps();
|
|
int noOfMaps = job.getNoOfMaps();
|
|
- incompleteMapTasks += (1 - Math.min(mapProgress,1.0))* noOfMaps;
|
|
|
|
|
|
+ incompleteMapTasks += calcEffectiveIncompleteMapTasks(clusterStatus
|
|
|
|
+ .getMaxMapTasks(), noOfMaps, mapProgress);
|
|
|
|
+ }
|
|
|
|
+ loadStatus.mapSlotsBackfill = (int) (OVERLOAD_MAPTASK_MAPSLOT_RATIO
|
|
|
|
+ * clusterStatus.getMaxMapTasks() - incompleteMapTasks);
|
|
|
|
+ if (loadStatus.mapSlotsBackfill <= 0) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
|
|
|
|
+ + Boolean.TRUE.toString() + " MapSlotsBackfill is "
|
|
|
|
+ + loadStatus.mapSlotsBackfill);
|
|
|
|
+ }
|
|
|
|
+ return; // stop calculation because we know it is overloaded.
|
|
}
|
|
}
|
|
|
|
|
|
- float overloadedThreshold =
|
|
|
|
- OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
|
|
|
|
- boolean overloaded = incompleteMapTasks > overloadedThreshold;
|
|
|
|
- String relOp = (overloaded) ? ">" : "<=";
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug(
|
|
|
|
- System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
|
|
|
|
- overloaded) + " incompleteMapTasks " + relOp + " " +
|
|
|
|
- OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
|
|
|
|
- incompleteMapTasks + " " + relOp + " " +
|
|
|
|
- OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*" +
|
|
|
|
- clusterStatus.getMaxMapTasks() + ")");
|
|
|
|
|
|
+ float incompleteReduceTasks = 0; // include pending & running reduce tasks.
|
|
|
|
+ for (JobStats job : ClusterStats.getRunningJobStats()) {
|
|
|
|
+ int noOfReduces = job.getJob().getNumReduceTasks();
|
|
|
|
+ if (noOfReduces > 0) {
|
|
|
|
+ float reduceProgress = job.getJob().reduceProgress();
|
|
|
|
+ incompleteReduceTasks += calcEffectiveIncompleteReduceTasks(
|
|
|
|
+ clusterStatus.getMaxReduceTasks(), noOfReduces, reduceProgress);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- if (overloaded) {
|
|
|
|
- loadStatus.isOverloaded = true;
|
|
|
|
- loadStatus.numSlotsBackfill = 0;
|
|
|
|
- } else {
|
|
|
|
- loadStatus.isOverloaded = false;
|
|
|
|
- loadStatus.numSlotsBackfill =
|
|
|
|
- (int) (overloadedThreshold - incompleteMapTasks);
|
|
|
|
|
|
+ loadStatus.reduceSlotsBackfill = (int) (OVERLOAD_REDUCETASK_REDUCESLOT_RATIO
|
|
|
|
+ * clusterStatus.getMaxReduceTasks() - incompleteReduceTasks);
|
|
|
|
+ if (loadStatus.reduceSlotsBackfill <= 0) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
|
|
|
|
+ + Boolean.TRUE.toString() + " ReduceSlotsBackfill is "
|
|
|
|
+ + loadStatus.reduceSlotsBackfill);
|
|
|
|
+ }
|
|
|
|
+ return; // stop calculation because we know it is overloaded.
|
|
}
|
|
}
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- LOG.debug("Current load Status is " + loadStatus);
|
|
|
|
|
|
+ LOG.debug(System.currentTimeMillis() + " Overloaded is "
|
|
|
|
+ + Boolean.FALSE.toString() + "Current load Status is " + loadStatus);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static class LoadStatus {
|
|
static class LoadStatus {
|
|
- boolean isOverloaded = false;
|
|
|
|
- int numSlotsBackfill = -1;
|
|
|
|
|
|
+ int mapSlotsBackfill;
|
|
|
|
+ int mapSlotCapacity;
|
|
|
|
+ int reduceSlotsBackfill;
|
|
|
|
+ int reduceSlotCapacity;
|
|
|
|
+ int numJobsBackfill;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Construct the LoadStatus in an unknown state - assuming the cluster is
|
|
|
|
+ * overloaded by setting numSlotsBackfill=0.
|
|
|
|
+ */
|
|
|
|
+ LoadStatus() {
|
|
|
|
+ mapSlotsBackfill = 0;
|
|
|
|
+ reduceSlotsBackfill = 0;
|
|
|
|
+ numJobsBackfill = 0;
|
|
|
|
+
|
|
|
|
+ mapSlotCapacity = -1;
|
|
|
|
+ reduceSlotCapacity = -1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public boolean overloaded() {
|
|
|
|
+ return (mapSlotsBackfill <= 0) || (reduceSlotsBackfill <= 0)
|
|
|
|
+ || (numJobsBackfill <= 0);
|
|
|
|
+ }
|
|
|
|
+
|
|
public String toString() {
|
|
public String toString() {
|
|
- return " is Overloaded " + isOverloaded + " no of slots available " +
|
|
|
|
- numSlotsBackfill;
|
|
|
|
|
|
+ return " Overloaded = " + overloaded()
|
|
|
|
+ + ", MapSlotBackfill = " + mapSlotsBackfill
|
|
|
|
+ + ", MapSlotCapacity = " + mapSlotCapacity
|
|
|
|
+ + ", ReduceSlotBackfill = " + reduceSlotsBackfill
|
|
|
|
+ + ", ReduceSlotCapacity = " + reduceSlotCapacity
|
|
|
|
+ + ", NumJobsBackfill = " + numJobsBackfill
|
|
|
|
+ ;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|