|
@@ -243,7 +243,7 @@ public class JobInitializationPoller extends Thread {
|
|
|
* Set of jobs which have been passed to Initialization threads.
|
|
|
* This is maintained so that we dont call initTasks() for same job twice.
|
|
|
*/
|
|
|
- private HashSet<JobID> initializedJobs;
|
|
|
+ private HashMap<JobID,JobInProgress> initializedJobs;
|
|
|
|
|
|
private volatile boolean running;
|
|
|
|
|
@@ -255,7 +255,7 @@ public class JobInitializationPoller extends Thread {
|
|
|
|
|
|
public JobInitializationPoller(JobQueuesManager mgr,
|
|
|
CapacitySchedulerConf rmConf, Set<String> queue) {
|
|
|
- initializedJobs = new HashSet<JobID>();
|
|
|
+ initializedJobs = new HashMap<JobID,JobInProgress>();
|
|
|
jobQueues = new HashMap<String, QueueInfo>();
|
|
|
this.jobQueueManager = mgr;
|
|
|
threadsToQueueMap = new HashMap<String, JobInitializationThread>();
|
|
@@ -293,9 +293,20 @@ public class JobInitializationPoller extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This is main thread of initialization poller, We essentially do
|
|
|
+ * following in the main threads:
|
|
|
+ *
|
|
|
+ * <ol>
|
|
|
+ * <li> Clean up the list of initialized jobs list which poller maintains
|
|
|
+ * </li>
|
|
|
+ * <li> Select jobs to initialize in the polling interval.</li>
|
|
|
+ * </ol>
|
|
|
+ */
|
|
|
public void run() {
|
|
|
while (running) {
|
|
|
try {
|
|
|
+ cleanUpInitializedJobsList();
|
|
|
selectJobsToInitialize();
|
|
|
if (!this.isInterrupted()) {
|
|
|
Thread.sleep(sleepInterval);
|
|
@@ -307,17 +318,18 @@ public class JobInitializationPoller extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // The key method that picks up jobs to initialize for each queue.
|
|
|
- // The jobs picked up are added to the worker thread that is handling
|
|
|
- // initialization for that queue.
|
|
|
- // The method is package private to allow tests to call it synchronously
|
|
|
- // in a controlled manner.
|
|
|
+ /**
|
|
|
+ * The key method which does selecting jobs to be initalized across
|
|
|
+ * queues and assign those jobs to their appropriate init-worker threads.
|
|
|
+ * <br/>
|
|
|
+ * This method is overriden in test case which is used to test job
|
|
|
+ * initialization poller.
|
|
|
+ *
|
|
|
+ */
|
|
|
void selectJobsToInitialize() {
|
|
|
for (String queue : jobQueues.keySet()) {
|
|
|
ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
|
|
|
- //if (LOG.isDebugEnabled()) {
|
|
|
- printJobs(jobsToInitialize);
|
|
|
- //}
|
|
|
+ printJobs(jobsToInitialize);
|
|
|
JobInitializationThread t = threadsToQueueMap.get(queue);
|
|
|
for (JobInProgress job : jobsToInitialize) {
|
|
|
t.addJobsToQueue(queue, job);
|
|
@@ -325,6 +337,13 @@ public class JobInitializationPoller extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Method used to print log statements about which jobs are being
|
|
|
+ * passed to init-threads.
|
|
|
+ *
|
|
|
+ * @param jobsToInitialize list of jobs which are passed to be
|
|
|
+ * init-threads.
|
|
|
+ */
|
|
|
private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
|
|
|
for (JobInProgress job : jobsToInitialize) {
|
|
|
LOG.info("Passing to Initializer Job Id :" + job.getJobID()
|
|
@@ -333,13 +352,25 @@ public class JobInitializationPoller extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // This method exists to be overridden by test cases that wish to
|
|
|
- // create a test-friendly worker thread which can be controlled
|
|
|
- // synchronously.
|
|
|
+ /**
|
|
|
+ * This method exists to be overridden by test cases that wish to
|
|
|
+ * create a test-friendly worker thread which can be controlled
|
|
|
+ * synchronously.
|
|
|
+ *
|
|
|
+ * @return Instance of worker init-threads.
|
|
|
+ */
|
|
|
JobInitializationThread createJobInitializationThread() {
|
|
|
return new JobInitializationThread();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Method which is used by the poller to assign appropriate worker thread
|
|
|
+ * to a queue. The number of threads would be always less than or equal
|
|
|
+ * to number of queues in a system. If number of threads is configured to
|
|
|
+ * be more than number of queues then poller does not create threads more
|
|
|
+ * than number of queues.
|
|
|
+ *
|
|
|
+ */
|
|
|
private void assignThreadsToQueues() {
|
|
|
int countOfQueues = jobQueues.size();
|
|
|
String[] queues = (String[]) jobQueues.keySet().toArray(
|
|
@@ -369,14 +400,38 @@ public class JobInitializationPoller extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Select jobs to be initialized for a given queue.
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * Method used to select jobs to be initialized for a given queue. <br/>
|
|
|
+ *
|
|
|
+ * We want to ensure that enough jobs have been initialized, so that when the
|
|
|
+ * Scheduler wants to consider a new job to run, it's ready. We clearly don't
|
|
|
+ * want to initialize too many jobs as each initialized job has a memory
|
|
|
+ * footprint, sometimes significant.
|
|
|
+ *
|
|
|
+ * Number of jobs to be initialized is restricted by two values: - Maximum
|
|
|
+ * number of users whose jobs we want to initialize, which is equal to
|
|
|
+ * the number of concurrent users the queue can support. - Maximum number
|
|
|
+ * of initialized jobs per user. The product of these two gives us the
|
|
|
+ * total number of initialized jobs.
|
|
|
*
|
|
|
- * The jobs are selected such that they are within the limits
|
|
|
- * for number of users and number of jobs per user in the queue.
|
|
|
- * The only exception is if high priority jobs are waiting to be
|
|
|
- * initialized. In that case, we could exceed the configured limits.
|
|
|
- * However, we try to restrict the excess to a minimum.
|
|
|
+ * Note that this is a rough number, meant for decreasing extra memory
|
|
|
+ * footprint. It's OK if we go over it once in a while, if we have to.
|
|
|
+ *
|
|
|
+ * This can happen as follows. Suppose we have initialized 3 jobs for a
|
|
|
+ * user. Now, suppose the user submits a job who's priority is higher than
|
|
|
+ * that of the 3 jobs initialized. This job needs to be initialized, since it
|
|
|
+ * will run earlier than the 3 jobs. We'll now have 4 initialized jobs for the
|
|
|
+ * user. If memory becomes a problem, we should ideally un-initialize one of
|
|
|
+ * the 3 jobs, to keep the count of initialized jobs at 3, but that's
|
|
|
+ * something we don't do for now. This situation can also arise when a new
|
|
|
+ * user submits a high priority job, thus superceeding a user whose jobs have
|
|
|
+ * already been initialized. The latter user's initialized jobs are redundant,
|
|
|
+ * but we'll leave them initialized.
|
|
|
+ *
|
|
|
+ * @param queue name of the queue to pick the jobs to initialize.
|
|
|
+ * @return list of jobs to be initalized in a queue. An empty queue is
|
|
|
+ * returned if no jobs are found.
|
|
|
*/
|
|
|
ArrayList<JobInProgress> getJobsToInitialize(String queue) {
|
|
|
QueueInfo qi = jobQueues.get(queue);
|
|
@@ -385,116 +440,131 @@ public class JobInitializationPoller extends Thread {
|
|
|
// queue.
|
|
|
int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
|
|
|
int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
|
|
|
- // calculate maximum number of jobs which can be allowed to initialize
|
|
|
- // for this queue.
|
|
|
- // This value is used when a user submits a high priority job after we
|
|
|
- // have initialized jobs for that queue and none of them is scheduled.
|
|
|
- // This would prevent us from initializing extra jobs for that particular
|
|
|
- // user. Explanation given at end of method.
|
|
|
int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
|
|
|
* maxJobsPerUserAllowedToInitialize;
|
|
|
- Collection<JobInProgress> jobs = jobQueueManager.getJobs(queue);
|
|
|
int countOfJobsInitialized = 0;
|
|
|
HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
|
|
|
+ Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
|
|
|
+ /*
|
|
|
+ * Walk through the collection of waiting jobs.
|
|
|
+ * We maintain a map of jobs that have already been initialized. If a
|
|
|
+ * job exists in that map, increment the count for that job's user
|
|
|
+ * and move on to the next job.
|
|
|
+ *
|
|
|
+ * If the job doesn't exist, see whether we want to initialize it.
|
|
|
+ * We initialize it if: - at least one job of the user has already
|
|
|
+ * been initialized, but the user's total initialized jobs are below
|
|
|
+ * the limit, OR - this is a new user, and we haven't reached the limit
|
|
|
+ * for the number of users whose jobs we want to initialize. We break
|
|
|
+ * when we've reached the limit of maximum jobs to initialize.
|
|
|
+ */
|
|
|
for (JobInProgress job : jobs) {
|
|
|
- /*
|
|
|
- * First check if job has been scheduled or completed or killed. If so
|
|
|
- * then remove from uninitialised jobs. Remove from Job queue
|
|
|
- */
|
|
|
- if ((job.getStatus().getRunState() == JobStatus.RUNNING)
|
|
|
- && (job.runningMaps() > 0 || job.runningReduces() > 0
|
|
|
- || job.finishedMaps() > 0 || job.finishedReduces() > 0)) {
|
|
|
- LOG.debug("Removing from the queue " + job.getJobID());
|
|
|
- initializedJobs.remove(job.getJobID());
|
|
|
- jobQueueManager.removeJobFromQueue(job);
|
|
|
- continue;
|
|
|
- } else if (job.isComplete()) {
|
|
|
- LOG.debug("Removing from completed job from " + "the queue "
|
|
|
- + job.getJobID());
|
|
|
- initializedJobs.remove(job.getJobID());
|
|
|
- jobQueueManager.removeJobFromQueue(job);
|
|
|
- continue;
|
|
|
- }
|
|
|
String user = job.getProfile().getUser();
|
|
|
int numberOfJobs = userJobsInitialized.get(user) == null ? 0
|
|
|
: userJobsInitialized.get(user);
|
|
|
// If the job is already initialized then add the count against user
|
|
|
// then continue.
|
|
|
- if (initializedJobs.contains(job.getJobID())) {
|
|
|
+ if (initializedJobs.containsKey(job.getJobID())) {
|
|
|
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
|
|
|
countOfJobsInitialized++;
|
|
|
continue;
|
|
|
}
|
|
|
boolean isUserPresent = userJobsInitialized.containsKey(user);
|
|
|
- /*
|
|
|
- * If the user is present in user list and size of user list is less
|
|
|
- * maximum allowed users initialize then initialize this job and add this
|
|
|
- * user to the global list.
|
|
|
- *
|
|
|
- * Else if he is present we check if his number of jobs has not crossed
|
|
|
- * his quota and global quota.
|
|
|
- *
|
|
|
- * The logic behind using a global per queue job can be understood by example
|
|
|
- * below: Consider 3 users submitting normal priority job in a job queue with
|
|
|
- * user limit as 100. (Max jobs per user = 2)
|
|
|
- *
|
|
|
- * U1J1,U1J2,U1J3....,U3J3.
|
|
|
- *
|
|
|
- * Jobs initialized would be
|
|
|
- *
|
|
|
- * U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
|
|
|
- *
|
|
|
- * Now consider a case where U4 comes in and submits a high priority job.
|
|
|
- *
|
|
|
- * U4J1 --- High Priority JOb, U4J2---- Normal priority job.
|
|
|
- *
|
|
|
- * So, if we dont use global per queue value we would end up initializing both
|
|
|
- * U4 jobs which is not correct.
|
|
|
- *
|
|
|
- * By using a global value we ensure that we dont initialize any extra jobs
|
|
|
- * for a user.
|
|
|
- */
|
|
|
if (!isUserPresent
|
|
|
&& userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
|
|
|
// this is a new user being considered and the number of users
|
|
|
// is within limits.
|
|
|
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
|
|
|
jobsToInitialize.add(job);
|
|
|
- initializedJobs.add(job.getJobID());
|
|
|
+ initializedJobs.put(job.getJobID(),job);
|
|
|
countOfJobsInitialized++;
|
|
|
} else if (isUserPresent
|
|
|
- && numberOfJobs < maxJobsPerUserAllowedToInitialize
|
|
|
- && countOfJobsInitialized < maxJobsPerQueueToInitialize) {
|
|
|
- /*
|
|
|
- * this is an existing user and the number of jobs per user
|
|
|
- * is within limits, as also the number of jobs per queue.
|
|
|
- * We need the check on number of jobs per queue to restrict
|
|
|
- * the number of jobs we initialize over the limit due to high
|
|
|
- * priority jobs.
|
|
|
- *
|
|
|
- * For e.g Consider 3 users submitting normal priority job in
|
|
|
- * a job queue with user limit as 100 and max jobs per user as 2
|
|
|
- * Say the jobs are U1J1,U1J2,U1J3....,U3J3.
|
|
|
- *
|
|
|
- * Jobs initialized would be U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
|
|
|
- *
|
|
|
- * Now consider a case where U4 comes in and submits a high priority job
|
|
|
- * and a normal priority job. Say U4J1 and U4J2
|
|
|
- *
|
|
|
- * If we dont consider the number of jobs per queue we would end up
|
|
|
- * initializing both jobs from U4. Initializing the second job is
|
|
|
- * unnecessary.
|
|
|
- */
|
|
|
+ && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
|
|
|
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
|
|
|
jobsToInitialize.add(job);
|
|
|
- initializedJobs.add(job.getJobID());
|
|
|
+ initializedJobs.put(job.getJobID(),job);
|
|
|
countOfJobsInitialized++;
|
|
|
}
|
|
|
+ /*
|
|
|
+ * if the maximum number of jobs to initalize for a queue is reached
|
|
|
+ * then we stop looking at further jobs. The jobs beyond this number
|
|
|
+ * can be initialized.
|
|
|
+ */
|
|
|
+ if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
return jobsToInitialize;
|
|
|
}
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
+ * Method which is used internally to clean up the initialized jobs
|
|
|
+ * data structure which the job initialization poller uses to check
|
|
|
+ * if a job is initalized or not.
|
|
|
+ *
|
|
|
+ * Algorithm for cleaning up task is as follows:
|
|
|
+ *
|
|
|
+ * <ul>
|
|
|
+ * <li> For jobs in <b>initalizedJobs</b> list </li>
|
|
|
+ * <ul>
|
|
|
+ * <li> If job is running</li>
|
|
|
+ * <ul>
|
|
|
+ * <li> If job is scheduled then remove the job from the waiting queue
|
|
|
+ * of the scheduler and <b>initalizedJobs</b>.<br/>
|
|
|
+ * The check for a job is scheduled or not is done by following
|
|
|
+ * formulae:<br/>
|
|
|
+ * if pending <i>task</i> < desired <i>task</i> then scheduled else
|
|
|
+ * not scheduled.<br/>
|
|
|
+ * The formulae would return <i>scheduled</i> if one task has run or failed,
|
|
|
+ * any cases in which there has been a failure but not enough to mark task
|
|
|
+ * as failed, we return <i>not scheduled</i> in formulae.
|
|
|
+ * </li>
|
|
|
+ * </ul>
|
|
|
+ *
|
|
|
+ * <li> If job is complete, then remove the job from <b>initalizedJobs</b>.
|
|
|
+ * </li>
|
|
|
+ *
|
|
|
+ * </ul>
|
|
|
+ * </ul>
|
|
|
+ *
|
|
|
+ */
|
|
|
+ void cleanUpInitializedJobsList() {
|
|
|
+ Iterator<Entry<JobID, JobInProgress>> jobsIterator =
|
|
|
+ initializedJobs.entrySet().iterator();
|
|
|
+ while(jobsIterator.hasNext()) {
|
|
|
+ Entry<JobID,JobInProgress> entry = jobsIterator.next();
|
|
|
+ JobInProgress job = entry.getValue();
|
|
|
+ if (job.getStatus().getRunState() == JobStatus.RUNNING) {
|
|
|
+ if (isScheduled(job)) {
|
|
|
+ LOG.info("Removing scheduled jobs from waiting queue"
|
|
|
+ + job.getJobID());
|
|
|
+ jobsIterator.remove();
|
|
|
+ jobQueueManager.removeJobFromWaitingQueue(job);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(job.isComplete()) {
|
|
|
+ LOG.info("Removing killed/completed job from initalized jobs " +
|
|
|
+ "list : "+ job.getJobID());
|
|
|
+ jobsIterator.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Convenience method to check if job has been scheduled or not.
|
|
|
+ *
|
|
|
+ * The method may return false in case of job which has failure but
|
|
|
+ * has not failed the tip.
|
|
|
+ * @param job
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private boolean isScheduled(JobInProgress job) {
|
|
|
+ return ((job.pendingMaps() < job.desiredMaps())
|
|
|
+ || (job.pendingReduces() < job.desiredReduces()));
|
|
|
+ }
|
|
|
+
|
|
|
void terminate() {
|
|
|
running = false;
|
|
|
for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
|
|
@@ -519,7 +589,7 @@ public class JobInitializationPoller extends Thread {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- HashSet<JobID> getInitializedJobList() {
|
|
|
- return initializedJobs;
|
|
|
+ Set<JobID> getInitializedJobList() {
|
|
|
+ return initializedJobs.keySet();
|
|
|
}
|
|
|
}
|