|
@@ -24,6 +24,8 @@ import java.util.Collection;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.HashSet;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -202,6 +204,9 @@ public class JobControl implements Runnable {
|
|
* Submit the jobs in ready state
|
|
* Submit the jobs in ready state
|
|
*/
|
|
*/
|
|
public void run() {
|
|
public void run() {
|
|
|
|
+ if (isCircular(jobsInProgress)) {
|
|
|
|
+ throw new IllegalArgumentException("job control has circular dependency");
|
|
|
|
+ }
|
|
try {
|
|
try {
|
|
this.runnerState = ThreadState.RUNNING;
|
|
this.runnerState = ThreadState.RUNNING;
|
|
while (true) {
|
|
while (true) {
|
|
@@ -281,4 +286,64 @@ public class JobControl implements Runnable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Uses topological sorting algorithm for finding circular dependency
|
|
|
|
+ */
|
|
|
|
+ private boolean isCircular(final List<ControlledJob> jobList) {
|
|
|
|
+ boolean cyclePresent = false;
|
|
|
|
+ HashSet<ControlledJob> SourceSet = new HashSet<ControlledJob>();
|
|
|
|
+ HashMap<ControlledJob, List<ControlledJob>> processedMap =
|
|
|
|
+ new HashMap<ControlledJob, List<ControlledJob>>();
|
|
|
|
+ for (ControlledJob n : jobList) {
|
|
|
|
+ processedMap.put(n, new ArrayList<ControlledJob>());
|
|
|
|
+ }
|
|
|
|
+ for (ControlledJob n : jobList) {
|
|
|
|
+ if (!hasInComingEdge(n, jobList, processedMap)) {
|
|
|
|
+ SourceSet.add(n);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ while (!SourceSet.isEmpty()) {
|
|
|
|
+ ControlledJob controlledJob = SourceSet.iterator().next();
|
|
|
|
+ SourceSet.remove(controlledJob);
|
|
|
|
+ if (controlledJob.getDependentJobs() != null) {
|
|
|
|
+ for (int i = 0; i < controlledJob.getDependentJobs().size(); i++) {
|
|
|
|
+ ControlledJob depenControlledJob =
|
|
|
|
+ controlledJob.getDependentJobs().get(i);
|
|
|
|
+ processedMap.get(controlledJob).add(depenControlledJob);
|
|
|
|
+ if (!hasInComingEdge(controlledJob, jobList, processedMap)) {
|
|
|
|
+ SourceSet.add(depenControlledJob);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for (ControlledJob controlledJob : jobList) {
|
|
|
|
+ if (controlledJob.getDependentJobs() != null
|
|
|
|
+ && controlledJob.getDependentJobs().size() != processedMap.get(
|
|
|
|
+ controlledJob).size()) {
|
|
|
|
+ cyclePresent = true;
|
|
|
|
+ LOG.error("Job control has circular dependency for the job "
|
|
|
|
+ + controlledJob.getJobName());
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return cyclePresent;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean hasInComingEdge(ControlledJob controlledJob,
|
|
|
|
+ List<ControlledJob> controlledJobList,
|
|
|
|
+ HashMap<ControlledJob, List<ControlledJob>> processedMap) {
|
|
|
|
+ boolean hasIncomingEdge = false;
|
|
|
|
+ for (ControlledJob k : controlledJobList) {
|
|
|
|
+ if (k != controlledJob && k.getDependentJobs() != null
|
|
|
|
+ && !processedMap.get(k).contains(controlledJob)
|
|
|
|
+ && k.getDependentJobs().contains(controlledJob)) {
|
|
|
|
+ hasIncomingEdge = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return hasIncomingEdge;
|
|
|
|
+
|
|
|
|
+ }
|
|
}
|
|
}
|