Bläddra i källkod

HADOOP-1190. Fix unchecked warnings in mapred package.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@530153 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 år sedan
förälder
incheckning
c002c20f14

+ 7 - 5
src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java

@@ -158,9 +158,11 @@ public class DefaultJobHistoryParser {
   
   // call this only for jobs that succeeded for better results. 
   static class BadNodesFilter implements JobHistory.Listener {
-    private Map<String, Set<String>> badNodesToNumFaiedTasks = new HashMap(); 
+    private Map<String, Set<String>> badNodesToNumFailedTasks =
+      new HashMap<String, Set<String>>();
+    
     Map<String, Set<String>> getValues(){
-      return badNodesToNumFaiedTasks; 
+      return badNodesToNumFailedTasks; 
     }
     public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
       throws IOException {
@@ -171,11 +173,11 @@ public class DefaultJobHistoryParser {
         if( Values.FAILED.name().equals(values.get(Keys.TASK_STATUS) )  ){
           String hostName = values.get(Keys.HOSTNAME) ;
           String taskid = values.get(Keys.TASKID); 
-          Set tasks = badNodesToNumFaiedTasks.get(hostName); 
+          Set<String> tasks = badNodesToNumFailedTasks.get(hostName); 
           if( null == tasks  ){
-            tasks = new TreeSet(); 
+            tasks = new TreeSet<String>(); 
             tasks.add(taskid);
-            badNodesToNumFaiedTasks.put(hostName, tasks);
+            badNodesToNumFailedTasks.put(hostName, tasks);
           }else{
             tasks.add(taskid);
           }

+ 5 - 4
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -81,7 +81,7 @@ public abstract class FileInputFormat implements InputFormat {
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
     }
-    List<Path> result = new ArrayList(); 
+    List<Path> result = new ArrayList<Path>(); 
     for (Path p: dirs) {
       FileSystem fs = p.getFileSystem(job); 
       Path[] matches =
@@ -100,7 +100,7 @@ public abstract class FileInputFormat implements InputFormat {
       throw new IOException("No input paths specified in input"); 
     }
     
-    List<IOException> result = new ArrayList();
+    List<IOException> result = new ArrayList<IOException>();
     int totalFiles = 0; 
     for (Path p: inputDirs) {
       FileSystem fs = p.getFileSystem(job);
@@ -161,7 +161,8 @@ public abstract class FileInputFormat implements InputFormat {
     long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                             minSplitSize);
 
-    ArrayList splits = new ArrayList(numSplits);  // generate splits
+    // generate splits
+    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
     for (int i = 0; i < files.length; i++) {
       Path file = files[i];
       FileSystem fs = file.getFileSystem(job);
@@ -188,7 +189,7 @@ public abstract class FileInputFormat implements InputFormat {
       }
     }
     LOG.debug( "Total # of splits: " + splits.size() );
-    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
+    return splits.toArray(new FileSplit[splits.size()]);
   }
 
   private static long computeSplitSize(long goalSize, long minSize,

+ 2 - 2
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -88,7 +88,7 @@ public class IsolationRunner {
   
   private static ClassLoader makeClassLoader(JobConf conf, 
                                              File workDir) throws IOException {
-    List cp = new ArrayList();
+    List<URL> cp = new ArrayList<URL>();
 
     String jar = conf.getJar();
     if (jar != null) {                      // if jar exists, it into workDir
@@ -101,7 +101,7 @@ public class IsolationRunner {
       cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
       cp.add(new URL("file:" + workDir.toString() + "/"));
     }
-    return new URLClassLoader((URL[]) cp.toArray(new URL[cp.size()]));
+    return new URLClassLoader(cp.toArray(new URL[cp.size()]));
   }
   
   /**

+ 4 - 4
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -334,11 +334,11 @@ public class JobClient extends ToolBase implements MRConstants  {
       job.getInputFormat().getSplits(job, job.getNumMapTasks());
     // sort the splits into order based on size, so that the biggest
     // go first
-    Arrays.sort(splits, new Comparator() {
-        public int compare(Object a, Object b) {
+    Arrays.sort(splits, new Comparator<InputSplit>() {
+        public int compare(InputSplit a, InputSplit b) {
           try {
-            long left = ((InputSplit) a).getLength();
-            long right = ((InputSplit) b).getLength();
+            long left = a.getLength();
+            long right = b.getLength();
             if (left == right) {
               return 0;
             } else if (left < right) {

+ 9 - 6
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -64,7 +64,8 @@ class JobInProgress {
   int failedMapTasks = 0 ; 
   int failedReduceTasks = 0 ; 
   JobTracker jobtracker = null;
-  Map<String,List<TaskInProgress>> hostToMaps = new HashMap();
+  Map<String,List<TaskInProgress>> hostToMaps =
+    new HashMap<String,List<TaskInProgress>>();
   private int taskCompletionEventTracker = 0 ; 
   List<TaskCompletionEvent> taskCompletionEvents ;
     
@@ -120,7 +121,7 @@ class JobInProgress {
 
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
-    this.taskCompletionEvents = new ArrayList(
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
                                               numMapTasks + numReduceTasks + 10);
         
     JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
@@ -184,7 +185,7 @@ class JobInProgress {
       for(String host: splits[i].getLocations()) {
         List<TaskInProgress> hostMaps = hostToMaps.get(host);
         if (hostMaps == null) {
-          hostMaps = new ArrayList();
+          hostMaps = new ArrayList<TaskInProgress>();
           hostToMaps.put(host, hostMaps);
         }
         hostMaps.add(maps[i]);              
@@ -280,10 +281,12 @@ class JobInProgress {
   }
     
   /**
-   * Return a treeset of completed TaskInProgress objects
+   * Return a vector of completed TaskInProgress objects
    */
-  public Vector reportTasksInProgress(boolean shouldBeMap, boolean shouldBeComplete) {
-    Vector results = new Vector();
+  public Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
+      boolean shouldBeComplete) {
+    
+    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
     TaskInProgress tips[] = null;
     if (shouldBeMap) {
       tips = maps;

+ 87 - 76
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -158,7 +158,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
      * but that have not yet been seen in a status report.
      * map: task-id (String) -> time-assigned (Long)
      */
-    private Map launchingTasks = new LinkedHashMap();
+    private Map<String, Long> launchingTasks =
+      new LinkedHashMap<String, Long>();
       
     public void run() {
       while (shouldRun) {
@@ -169,16 +170,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
           LOG.debug("Starting launching task sweep");
           synchronized (JobTracker.this) {
             synchronized (launchingTasks) {
-              Iterator itr = launchingTasks.entrySet().iterator();
+              Iterator<Map.Entry<String, Long>> itr =
+                launchingTasks.entrySet().iterator();
               while (itr.hasNext()) {
-                Map.Entry pair = (Map.Entry) itr.next();
-                String taskId = (String) pair.getKey();
-                long age = now - ((Long) pair.getValue()).longValue();
+                Map.Entry<String, Long> pair = itr.next();
+                String taskId = pair.getKey();
+                long age = now - (pair.getValue()).longValue();
                 LOG.info(taskId + " is " + age + " ms debug.");
                 if (age > TASKTRACKER_EXPIRY_INTERVAL) {
                   LOG.info("Launching task " + taskId + " timed out.");
                   TaskInProgress tip = null;
-                  tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+                  tip = taskidToTIPMap.get(taskId);
                   if (tip != null) {
                     JobInProgress job = tip.getJob();
                     String trackerName = getAssignedTracker(taskId);
@@ -269,7 +271,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 long now = System.currentTimeMillis();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
-                       ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
+                       ((leastRecent = trackerExpiryQueue.first()) != null) &&
                        (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
                         
                   // Remove profile from head of queue
@@ -277,7 +279,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                   String trackerName = leastRecent.getTrackerName();
                         
                   // Figure out if last-seen time should be updated, or if tracker is dead
-                  TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
+                  TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName());
                   // Items might leave the taskTracker set through other means; the
                   // status stored in 'taskTrackers' might be null, which means the
                   // tracker has already been destroyed.
@@ -328,7 +330,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       while (shouldRun) {
         try {
           Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
-          List<JobInProgress> retiredJobs = new ArrayList();
+          List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
           long retireBefore = System.currentTimeMillis() - 
             RETIRE_JOB_INTERVAL;
           synchronized (jobsByArrival) {
@@ -507,27 +509,31 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   //
 
   // All the known jobs.  (jobid->JobInProgress)
-  Map<String, JobInProgress> jobs = new TreeMap();
-  List<JobInProgress> jobsByArrival = new ArrayList();
+  Map<String, JobInProgress> jobs = new TreeMap<String, JobInProgress>();
+  List<JobInProgress> jobsByArrival = new ArrayList<JobInProgress>();
 
   // (user -> list of JobInProgress)
-  TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
+  TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
+    new TreeMap<String, ArrayList<JobInProgress>>();
     
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
-  Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
+  Map<String, TaskInProgress> taskidToTIPMap =
+    new TreeMap<String, TaskInProgress>();
 
   // (taskid --> trackerID) 
-  TreeMap taskidToTrackerMap = new TreeMap();
+  TreeMap<String, String> taskidToTrackerMap = new TreeMap<String, String>();
 
   // (trackerID->TreeSet of taskids running at that tracker)
-  TreeMap trackerToTaskMap = new TreeMap();
+  TreeMap<String, Set<String>> trackerToTaskMap =
+    new TreeMap<String, Set<String>>();
 
   // (trackerID -> TreeSet of completed taskids running at that tracker)
-  TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
+  TreeMap<String, Set<String>> trackerToMarkedTasksMap =
+    new TreeMap<String, Set<String>>();
 
   // (trackerID --> last sent HeartBeatResponse)
   Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
-    new TreeMap();
+    new TreeMap<String, HeartbeatResponse>();
     
   //
   // Watch and expire TaskTracker objects using these structures.
@@ -535,8 +541,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   //
   int totalMaps = 0;
   int totalReduces = 0;
-  private TreeMap taskTrackers = new TreeMap();
-  List<JobInProgress> jobInitQueue = new ArrayList();
+  private TreeMap<String, TaskTrackerStatus> taskTrackers =
+    new TreeMap<String, TaskTrackerStatus>();
+  List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
   RetireJobs retireJobs = new RetireJobs();
@@ -556,19 +563,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    * object has been updated in the taskTracker table, the latest status is 
    * reinserted.  Otherwise, we assume the tracker has expired.
    */
-  TreeSet trackerExpiryQueue = new TreeSet(new Comparator() {
-      public int compare(Object o1, Object o2) {
-        TaskTrackerStatus p1 = (TaskTrackerStatus) o1;
-        TaskTrackerStatus p2 = (TaskTrackerStatus) o2;
-        if (p1.getLastSeen() < p2.getLastSeen()) {
-          return -1;
-        } else if (p1.getLastSeen() > p2.getLastSeen()) {
-          return 1;
-        } else {
-          return (p1.getTrackerName().compareTo(p2.getTrackerName()));
+  TreeSet<TaskTrackerStatus> trackerExpiryQueue =
+    new TreeSet<TaskTrackerStatus>(
+      new Comparator<TaskTrackerStatus>() {
+        public int compare(TaskTrackerStatus p1, TaskTrackerStatus p2) {
+          if (p1.getLastSeen() < p2.getLastSeen()) {
+            return -1;
+          } else if (p1.getLastSeen() > p2.getLastSeen()) {
+            return 1;
+          } else {
+            return (p1.getTrackerName().compareTo(p2.getTrackerName()));
+          }
         }
       }
-    });
+    );
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
   StatusHttpServer infoServer;
@@ -746,9 +754,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     taskidToTrackerMap.put(taskid, taskTracker);
 
     // tracker --> taskid
-    TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker);
+    Set<String> taskset = trackerToTaskMap.get(taskTracker);
     if (taskset == null) {
-      taskset = new TreeSet();
+      taskset = new TreeSet<String>();
       trackerToTaskMap.put(taskTracker, taskset);
     }
     taskset.add(taskid);
@@ -759,11 +767,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     
   void removeTaskEntry(String taskid) {
     // taskid --> tracker
-    String tracker = (String) taskidToTrackerMap.remove(taskid);
+    String tracker = taskidToTrackerMap.remove(taskid);
 
     // tracker --> taskid
     if (tracker != null) {
-      TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+      Set<String> trackerSet = trackerToTaskMap.get(tracker);
       if (trackerSet != null) {
         trackerSet.remove(taskid);
       }
@@ -784,9 +792,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    */
   void markCompletedTaskAttempt(String taskTracker, String taskid) {
     // tracker --> taskid
-    TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
+    Set<String> taskset = trackerToMarkedTasksMap.get(taskTracker);
     if (taskset == null) {
-      taskset = new TreeSet();
+      taskset = new TreeSet<String>();
       trackerToMarkedTasksMap.put(taskTracker, taskset);
     }
     taskset.add(taskid);
@@ -828,8 +836,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    */
   private void removeMarkedTasks(String taskTracker) {
     // Purge all the 'marked' tasks which were running at taskTracker
-    TreeSet<String> markedTaskSet = 
-      (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
+    Set<String> markedTaskSet = 
+      trackerToMarkedTasksMap.get(taskTracker);
     if (markedTaskSet != null) {
       for (String taskid : markedTaskSet) {
         removeTaskEntry(taskid);
@@ -954,8 +962,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   public long getStartTime() {
     return startTime;
   }
-  public Vector runningJobs() {
-    Vector v = new Vector();
+  public Vector<JobInProgress> runningJobs() {
+    Vector<JobInProgress> v = new Vector<JobInProgress>();
     for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
       JobInProgress jip = (JobInProgress) it.next();
       JobStatus status = jip.getStatus();
@@ -974,8 +982,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       return (List<JobInProgress>) runningJobs();
     }
   }
-  public Vector failedJobs() {
-    Vector v = new Vector();
+  public Vector<JobInProgress> failedJobs() {
+    Vector<JobInProgress> v = new Vector<JobInProgress>();
     for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
       JobInProgress jip = (JobInProgress) it.next();
       JobStatus status = jip.getStatus();
@@ -985,8 +993,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
     return v;
   }
-  public Vector completedJobs() {
-    Vector v = new Vector();
+  public Vector<JobInProgress> completedJobs() {
+    Vector<JobInProgress> v = new Vector<JobInProgress>();
     for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
       JobInProgress jip = (JobInProgress) it.next();
       JobStatus status = jip.getStatus();
@@ -1003,7 +1011,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   }
   public TaskTrackerStatus getTaskTracker(String trackerID) {
     synchronized (taskTrackers) {
-      return (TaskTrackerStatus) taskTrackers.get(trackerID);
+      return taskTrackers.get(trackerID);
     }
   }
 
@@ -1075,7 +1083,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       
     // Initialize the response to be sent for the heartbeat
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
-    List<TaskTrackerAction> actions = new ArrayList();
+    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
       
     // Check for new tasks to be executed on the tasktracker
     if (acceptNewTasks) {
@@ -1140,8 +1148,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    */
   private boolean updateTaskTrackerStatus(String trackerName,
                                           TaskTrackerStatus status) {
-    TaskTrackerStatus oldStatus = 
-      (TaskTrackerStatus) taskTrackers.get(trackerName);
+    TaskTrackerStatus oldStatus = taskTrackers.get(trackerName);
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
@@ -1214,7 +1221,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 	
     synchronized (taskTrackers) {
       numTaskTrackers = taskTrackers.size();
-      tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
+      tts = taskTrackers.get(taskTracker);
     }
     if (tts == null) {
       LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
@@ -1346,13 +1353,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    * A tracker wants to know if any of its Tasks have been
    * closed (because the job completed, whether successfully or not)
    */
-  private synchronized List getTasksToKill(String taskTracker) {
-    Set<String> taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);
+  private synchronized List<TaskTrackerAction> getTasksToKill(
+      String taskTracker) {
+    
+    Set<String> taskIds = trackerToTaskMap.get(taskTracker);
     if (taskIds != null) {
-      List<TaskTrackerAction> killList = new ArrayList();
-      Set<String> killJobIds = new TreeSet(); 
+      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
+      Set<String> killJobIds = new TreeSet<String>(); 
       for (String killTaskId : taskIds ) {
-        TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId);
+        TaskInProgress tip = taskidToTIPMap.get(killTaskId);
         if (tip.shouldCloseForClosedJob(killTaskId)) {
           // 
           // This is how the JobTracker ends a task at the TaskTracker.
@@ -1447,12 +1456,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   }
     
   public synchronized void killJob(String jobid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     job.kill();
   }
 
   public synchronized JobProfile getJobProfile(String jobid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     if (job != null) {
       return job.getProfile();
     } else {
@@ -1460,7 +1469,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
   }
   public synchronized JobStatus getJobStatus(String jobid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     if (job != null) {
       return job.getStatus();
     } else {
@@ -1468,7 +1477,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
   }
   public synchronized Counters getJobCounters(String jobid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     if (job != null) {
       return job.getCounters();
     } else {
@@ -1476,31 +1485,33 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
   }
   public synchronized TaskReport[] getMapTaskReports(String jobid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     if (job == null) {
       return new TaskReport[0];
     } else {
-      Vector reports = new Vector();
-      Vector completeMapTasks = job.reportTasksInProgress(true, true);
+      Vector<TaskReport> reports = new Vector<TaskReport>();
+      Vector<TaskInProgress> completeMapTasks =
+        job.reportTasksInProgress(true, true);
       for (Iterator it = completeMapTasks.iterator(); it.hasNext(); ) {
         TaskInProgress tip = (TaskInProgress) it.next();
         reports.add(tip.generateSingleReport());
       }
-      Vector incompleteMapTasks = job.reportTasksInProgress(true, false);
+      Vector<TaskInProgress> incompleteMapTasks =
+        job.reportTasksInProgress(true, false);
       for (Iterator it = incompleteMapTasks.iterator(); it.hasNext(); ) {
         TaskInProgress tip = (TaskInProgress) it.next();
         reports.add(tip.generateSingleReport());
       }
-      return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);
+      return reports.toArray(new TaskReport[reports.size()]);
     }
   }
 
   public synchronized TaskReport[] getReduceTaskReports(String jobid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     if (job == null) {
       return new TaskReport[0];
     } else {
-      Vector reports = new Vector();
+      Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector completeReduceTasks = job.reportTasksInProgress(false, true);
       for (Iterator it = completeReduceTasks.iterator(); it.hasNext(); ) {
         TaskInProgress tip = (TaskInProgress) it.next();
@@ -1511,7 +1522,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         TaskInProgress tip = (TaskInProgress) it.next();
         reports.add(tip.generateSingleReport());
       }
-      return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);
+      return reports.toArray(new TaskReport[reports.size()]);
     }
   }
     
@@ -1523,7 +1534,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
                                                                     String jobid, int fromEventId, int maxEvents) throws IOException{
     TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
-    JobInProgress job = (JobInProgress)this.jobs.get(jobid);
+    JobInProgress job = this.jobs.get(jobid);
     if (null != job) {
       events = job.getTaskCompletionEvents(fromEventId, maxEvents);
     }
@@ -1540,7 +1551,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   public synchronized List<String> getTaskDiagnostics(String jobId,
                                                       String tipId,
                                                       String taskId) {
-    JobInProgress job = (JobInProgress) jobs.get(jobId);
+    JobInProgress job = jobs.get(jobId);
     if (job == null) {
       throw new IllegalArgumentException("Job " + jobId + " not found.");
     }
@@ -1577,7 +1588,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    * Returns specified TaskInProgress, or null.
    */
   private TaskInProgress getTip(String jobid, String tipid) {
-    JobInProgress job = (JobInProgress) jobs.get(jobid);
+    JobInProgress job = jobs.get(jobid);
     return (job == null ? null 
             : (TaskInProgress) job.getTaskInProgress(tipid));
   }
@@ -1588,11 +1599,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    * @return The name of the task tracker
    */
   public synchronized String getAssignedTracker(String taskId) {
-    return (String) taskidToTrackerMap.get(taskId);
+    return taskidToTrackerMap.get(taskId);
   }
     
   public JobStatus[] jobsToComplete() {
-    Vector v = new Vector();
+    Vector<JobStatus> v = new Vector<JobStatus>();
     for (Iterator it = jobs.values().iterator(); it.hasNext(); ) {
       JobInProgress jip = (JobInProgress) it.next();
       JobStatus status = jip.getStatus();
@@ -1603,14 +1614,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         v.add(status);
       }
     }
-    return (JobStatus[]) v.toArray(new JobStatus[v.size()]);
+    return v.toArray(new JobStatus[v.size()]);
   } 
     
   ///////////////////////////////////////////////////////////////
   // JobTracker methods
   ///////////////////////////////////////////////////////////////
   public JobInProgress getJob(String jobid) {
-    return (JobInProgress) jobs.get(jobid);
+    return jobs.get(jobid);
   }
   /**
    * Grab random num for job id
@@ -1633,7 +1644,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     for (TaskStatus report : status.getTaskReports()) {
       report.setTaskTracker(status.getTrackerName());
       String taskId = report.getTaskId();
-      TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+      TaskInProgress tip = taskidToTIPMap.get(taskId);
       if (tip == null) {
         LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());
       } else {
@@ -1650,13 +1661,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
    */
   void lostTaskTracker(String trackerName, String hostname) {
     LOG.info("Lost tracker '" + trackerName + "'");
-    TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName);
+    Set<String> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);
 
     if (lostTasks != null) {
       for (Iterator it = lostTasks.iterator(); it.hasNext(); ) {
         String taskId = (String) it.next();
-        TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+        TaskInProgress tip = taskidToTIPMap.get(taskId);
 
         // Completed reduce tasks never need to be failed, because 
         // their outputs go to dfs

+ 7 - 7
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -37,7 +37,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     LogFactory.getLog("org.apache.hadoop.mapred.LocalJobRunner");
 
   private FileSystem fs;
-  private HashMap jobs = new HashMap();
+  private HashMap<String, Job> jobs = new HashMap<String, Job>();
   private Configuration conf;
   private int map_tasks = 0;
   private int reduce_tasks = 0;
@@ -56,7 +56,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     private Random random = new Random();
 
     private JobStatus status;
-    private ArrayList mapIds = new ArrayList();
+    private ArrayList<String> mapIds = new ArrayList<String>();
     private MapOutputFile mapoutputFile;
     private JobProfile profile;
     private Path localFile;
@@ -132,7 +132,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
         // move map output to reduce input
         String reduceId = "reduce_" + newId();
         for (int i = 0; i < mapIds.size(); i++) {
-          String mapId = (String)mapIds.get(i);
+          String mapId = mapIds.get(i);
           Path mapOut = this.mapoutputFile.getOutputFile(mapId);
           Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
           if (!localFs.mkdirs(reduceIn.getParent())) {
@@ -252,11 +252,11 @@ class LocalJobRunner implements JobSubmissionProtocol {
   }
 
   public void killJob(String id) {
-    ((Thread)jobs.get(id)).stop();
+    jobs.get(id).stop();
   }
 
   public JobProfile getJobProfile(String id) {
-    Job job = (Job)jobs.get(id);
+    Job job = jobs.get(id);
     return job.getProfile();
   }
 
@@ -268,12 +268,12 @@ class LocalJobRunner implements JobSubmissionProtocol {
   }
 
   public JobStatus getJobStatus(String id) {
-    Job job = (Job)jobs.get(id);
+    Job job = jobs.get(id);
     return job.status;
   }
   
   public Counters getJobCounters(String id) {
-    Job job = (Job)jobs.get(id);
+    Job job = jobs.get(id);
     return job.currentCounters;
   }
 

+ 2 - 1
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -496,7 +496,8 @@ class MapTask extends Task {
         Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
         
         for (int parts = 0; parts < partitions; parts++){
-          List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+          List<SegmentDescriptor> segmentList =
+            new ArrayList<SegmentDescriptor>(numSpills);
           for(int i = 0; i < numSpills; i++) {
             FSDataInputStream indexIn = localFs.open(indexFileName[i]);
             indexIn.seek(parts * 16);

+ 14 - 13
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -266,7 +266,7 @@ class ReduceTask extends Task {
     // since we don't know how many map outputs got merged in memory, we have
     // to check whether a given map output exists, and if it does, add it in
     // the list of files to merge, otherwise not.
-    List <Path> mapFilesList = new ArrayList();
+    List<Path> mapFilesList = new ArrayList<Path>();
     for(int i=0; i < numMaps; i++) {
       Path f = mapOutputFile.getInputFile(i, getTaskId());
       if (lfs.exists(f))
@@ -423,12 +423,12 @@ class ReduceTask extends Task {
     /**
      * the list of map outputs currently being copied
      */
-    private List scheduledCopies;
+    private List<MapOutputLocation> scheduledCopies;
     
     /**
      *  the results of dispatched copy attempts
      */
-    private List copyResults;
+    private List<CopyResult> copyResults;
     
     /**
      *  the number of outputs to copy in parallel
@@ -446,12 +446,12 @@ class ReduceTask extends Task {
      * busy hosts from which copies are being backed off
      * Map of host -> next contact time
      */
-    private Map penaltyBox;
+    private Map<String, Long> penaltyBox;
     
     /**
      * the set of unique hosts from which we are copying
      */
-    private Set uniqueHosts;
+    private Set<String> uniqueHosts;
     
     /**
      * the last time we polled the job tracker
@@ -511,7 +511,8 @@ class ReduceTask extends Task {
     /**
      * a hashmap from mapId to MapOutputLocation for retrials
      */
-    private Map<Integer, MapOutputLocation> retryFetches = new HashMap();
+    private Map<Integer, MapOutputLocation> retryFetches =
+      new HashMap<Integer, MapOutputLocation>();
     
     /** 
      * a TreeSet for needed map outputs
@@ -632,7 +633,7 @@ class ReduceTask extends Task {
               while (scheduledCopies.isEmpty()) {
                 scheduledCopies.wait();
               }
-              loc = (MapOutputLocation)scheduledCopies.remove(0);
+              loc = scheduledCopies.remove(0);
             }
             
             try {
@@ -774,8 +775,8 @@ class ReduceTask extends Task {
       configureClasspath(conf);
       this.umbilical = umbilical;      
       this.reduceTask = ReduceTask.this;
-      this.scheduledCopies = new ArrayList(100);
-      this.copyResults = new ArrayList(100);    
+      this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
+      this.copyResults = new ArrayList<CopyResult>(100);    
       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
       this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
@@ -793,10 +794,10 @@ class ReduceTask extends Task {
                                 conf.getMapOutputValueClass(), conf);
       
       // hosts -> next contact time
-      this.penaltyBox = new Hashtable();
+      this.penaltyBox = new Hashtable<String, Long>();
       
       // hostnames
-      this.uniqueHosts = new HashSet();
+      this.uniqueHosts = new HashSet<String>();
       
       this.lastPollTime = 0;
       
@@ -896,7 +897,7 @@ class ReduceTask extends Task {
             while (locIt.hasNext()) {
               
               MapOutputLocation loc = (MapOutputLocation)locIt.next();
-              Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
+              Long penaltyEnd = penaltyBox.get(loc.getHost());
               boolean penalized = false, duplicate = false;
               
               if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
@@ -1086,7 +1087,7 @@ class ReduceTask extends Task {
         if (copyResults.isEmpty()) {
           return null;
         } else {
-          return (CopyResult) copyResults.remove(0);
+          return copyResults.remove(0);
         }
       }    
     }

+ 11 - 10
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -89,18 +89,19 @@ class TaskInProgress {
     
   // Map from task Id -> TaskTracker Id, contains tasks that are
   // currently runnings
-  private TreeMap<String, String> activeTasks = new TreeMap();
+  private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
   private JobConf conf;
   private boolean runSpeculative;
-  private Map<String,List<String>> taskDiagnosticData = new TreeMap();
+  private Map<String,List<String>> taskDiagnosticData =
+    new TreeMap<String,List<String>>();
   /**
    * Map from taskId -> TaskStatus
    */
   private TreeMap<String,TaskStatus> taskStatuses = 
     new TreeMap<String,TaskStatus>();
 
-  private TreeSet machinesWhereFailed = new TreeSet();
-  private TreeSet tasksReportedClosed = new TreeSet();
+  private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
+  private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
     
   private Counters counters = new Counters();
 
@@ -288,13 +289,13 @@ class TaskInProgress {
    * component task-threads that have ever been started.
    */
   synchronized TaskReport generateSingleReport() {
-    ArrayList diagnostics = new ArrayList();
-    for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {
-      diagnostics.addAll((List)i.next());
+    ArrayList<String> diagnostics = new ArrayList<String>();
+    for (List<String> l : taskDiagnosticData.values()) {
+      diagnostics.addAll(l);
     }
     TaskReport report = new TaskReport
       (getTIPId(), (float)progress, state,
-       (String[])diagnostics.toArray(new String[diagnostics.size()]),
+       diagnostics.toArray(new String[diagnostics.size()]),
        execStartTime, execFinishTime, counters);
       
     return report ;
@@ -326,9 +327,9 @@ class TaskInProgress {
     boolean changed = true;
     if (diagInfo != null && diagInfo.length() > 0) {
       LOG.info("Error from "+taskid+": "+diagInfo);
-      List diagHistory = (List) taskDiagnosticData.get(taskid);
+      List<String> diagHistory = taskDiagnosticData.get(taskid);
       if (diagHistory == null) {
-        diagHistory = new ArrayList();
+        diagHistory = new ArrayList<String>();
         taskDiagnosticData.put(taskid, diagHistory);
       }
       diagHistory.add(diagInfo);

+ 13 - 13
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -115,7 +115,7 @@ public class TaskTracker
     
   boolean shuttingDown = false;
     
-  Map<String, TaskInProgress> tasks = new HashMap();
+  Map<String, TaskInProgress> tasks = new HashMap<String, TaskInProgress>();
   /**
    * Map from taskId -> TaskInProgress.
    */
@@ -126,7 +126,7 @@ public class TaskTracker
   boolean justStarted = true;
     
   //dir -> DF
-  Map localDirsDf = new HashMap();
+  Map<String, DF> localDirsDf = new HashMap<String, DF>();
   long minSpaceStart = 0;
   //must have this much space free to start new tasks
   boolean acceptNewTasks = true;
@@ -186,7 +186,7 @@ public class TaskTracker
    * A list of tips that should be cleaned up.
    */
   private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
-    new LinkedBlockingQueue();
+    new LinkedBlockingQueue<TaskTrackerAction>();
     
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
@@ -231,7 +231,7 @@ public class TaskTracker
       if (!runningJobs.containsKey(jobId)) {
         rJob = new RunningJob(jobId, localJobFile);
         rJob.localized = false;
-        rJob.tasks = new HashSet();
+        rJob.tasks = new HashSet<TaskInProgress>();
         rJob.jobFile = localJobFile;
         runningJobs.put(jobId, rJob);
       } else {
@@ -297,8 +297,8 @@ public class TaskTracker
 
     // Clear out state tables
     this.tasks.clear();
-    this.runningTasks = new TreeMap();
-    this.runningJobs = new TreeMap();
+    this.runningTasks = new TreeMap<String, TaskInProgress>();
+    this.runningJobs = new TreeMap<String, RunningJob>();
     this.mapTotal = 0;
     this.reduceTotal = 0;
     this.acceptNewTasks = true;
@@ -611,10 +611,10 @@ public class TaskTracker
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
     //
-    TreeMap tasksToClose = new TreeMap();
+    TreeMap<String, TaskInProgress> tasksToClose =
+      new TreeMap<String, TaskInProgress>();
     tasksToClose.putAll(tasks);
-    for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) {
-      TaskInProgress tip = (TaskInProgress) it.next();
+    for (TaskInProgress tip : tasksToClose.values()) {
       tip.jobHasFinished();
     }
 
@@ -1048,7 +1048,7 @@ public class TaskTracker
     for (int i = 0; i < localDirs.length; i++) {
       DF df = null;
       if (localDirsDf.containsKey(localDirs[i])) {
-        df = (DF) localDirsDf.get(localDirs[i]);
+        df = localDirsDf.get(localDirs[i]);
       } else {
         df = new DF(new File(localDirs[i]), fConf);
         localDirsDf.put(localDirs[i], df);
@@ -1648,7 +1648,7 @@ public class TaskTracker
     RunningJob(String jobid, Path jobFile) {
       this.jobid = jobid;
       localized = false;
-      tasks = new HashSet();
+      tasks = new HashSet<TaskInProgress>();
       this.jobFile = jobFile;
       keepJobFiles = false;
     }
@@ -1768,7 +1768,7 @@ public class TaskTracker
    * @return a copy of the list of TaskStatus objects
    */
   synchronized List<TaskStatus> getRunningTaskStatuses() {
-    List<TaskStatus> result = new ArrayList(runningTasks.size());
+    List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
     for(TaskInProgress tip: runningTasks.values()) {
       result.add(tip.createStatus());
     }
@@ -1780,7 +1780,7 @@ public class TaskTracker
    * @return
    */
   synchronized List<TaskStatus> getNonRunningTasks() {
-    List<TaskStatus> result = new ArrayList(tasks.size());
+    List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
     for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) {
       if (!runningTasks.containsKey(task.getKey())) {
         result.add(task.getValue().createStatus());

+ 2 - 2
src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -51,7 +51,7 @@ class TaskTrackerStatus implements Writable {
   /**
    */
   public TaskTrackerStatus() {
-    taskReports = new ArrayList();
+    taskReports = new ArrayList<TaskStatus>();
   }
 
   /**
@@ -63,7 +63,7 @@ class TaskTrackerStatus implements Writable {
     this.host = host;
     this.httpPort = httpPort;
 
-    this.taskReports = new ArrayList(taskReports);
+    this.taskReports = new ArrayList<TaskStatus>(taskReports);
     this.failures = failures;
   }
 

+ 32 - 39
src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java

@@ -48,11 +48,11 @@ public class JobControl implements Runnable{
 	
   private int runnerState;			// the thread state
 	
-  private Hashtable waitingJobs;
-  private Hashtable readyJobs;
-  private Hashtable runningJobs;
-  private Hashtable successfulJobs;
-  private Hashtable failedJobs;
+  private Hashtable<String, Job> waitingJobs;
+  private Hashtable<String, Job> readyJobs;
+  private Hashtable<String, Job> runningJobs;
+  private Hashtable<String, Job> successfulJobs;
+  private Hashtable<String, Job> failedJobs;
 	
   private long nextJobID;
   private String groupName;
@@ -62,23 +62,22 @@ public class JobControl implements Runnable{
    * @param groupName a name identifying this group
    */
   public JobControl(String groupName) {
-    this.waitingJobs = new Hashtable();
-    this.readyJobs = new Hashtable();
-    this.runningJobs = new Hashtable();
-    this.successfulJobs = new Hashtable();
-    this.failedJobs = new Hashtable();
+    this.waitingJobs = new Hashtable<String, Job>();
+    this.readyJobs = new Hashtable<String, Job>();
+    this.runningJobs = new Hashtable<String, Job>();
+    this.successfulJobs = new Hashtable<String, Job>();
+    this.failedJobs = new Hashtable<String, Job>();
     this.nextJobID = -1;
     this.groupName = groupName;
     this.runnerState = JobControl.READY;
 		
   }
 	
-  private static ArrayList toArrayList(Hashtable jobs) {
-    ArrayList retv = new ArrayList();
+  private static ArrayList<Job> toArrayList(Hashtable<String, Job> jobs) {
+    ArrayList<Job> retv = new ArrayList<Job>();
     synchronized (jobs) {
-      Iterator iter = jobs.values().iterator();
-      while (iter.hasNext()) {
-        retv.add(iter.next());
+      for (Job job : jobs.values()) {
+        retv.add(job);
       }
     }
 		
@@ -88,32 +87,32 @@ public class JobControl implements Runnable{
   /**
    * @return the jobs in the waiting state
    */
-  public ArrayList getWaitingJobs() {
+  public ArrayList<Job> getWaitingJobs() {
     return JobControl.toArrayList(this.waitingJobs);
   }
 	
   /**
    * @return the jobs in the running state
    */
-  public ArrayList getRunningJobs() {
+  public ArrayList<Job> getRunningJobs() {
     return JobControl.toArrayList(this.runningJobs);
   }
 	
   /**
    * @return the jobs in the ready state
    */
-  public ArrayList getReadyJobs() {
+  public ArrayList<Job> getReadyJobs() {
     return JobControl.toArrayList(this.readyJobs);
   }
 	
   /**
    * @return the jobs in the success state
    */
-  public ArrayList getSuccessfulJobs() {
+  public ArrayList<Job> getSuccessfulJobs() {
     return JobControl.toArrayList(this.successfulJobs);
   }
 	
-  public ArrayList getFailedJobs() {
+  public ArrayList<Job> getFailedJobs() {
     return JobControl.toArrayList(this.failedJobs);
   }
 	
@@ -122,19 +121,19 @@ public class JobControl implements Runnable{
     return this.groupName + this.nextJobID;
   }
 	
-  private static void addToQueue(Job aJob, Hashtable queue) {
+  private static void addToQueue(Job aJob, Hashtable<String, Job> queue) {
     synchronized(queue) {
       queue.put(aJob.getJobID(), aJob);
     }		
   }
 	
   private void addToQueue(Job aJob) {
-    Hashtable queue = getQueue(aJob.getState());
+    Hashtable<String, Job> queue = getQueue(aJob.getState());
     addToQueue(aJob, queue);	
   }
 	
-  private Hashtable getQueue(int state) {
-    Hashtable retv = null;
+  private Hashtable<String, Job> getQueue(int state) {
+    Hashtable<String, Job> retv = null;
     if (state == Job.WAITING) {
       retv = this.waitingJobs;
     } else if (state == Job.READY) {
@@ -208,13 +207,11 @@ public class JobControl implements Runnable{
 	
   synchronized private void checkRunningJobs() {
 		
-    Hashtable oldJobs = null;
+    Hashtable<String, Job> oldJobs = null;
     oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable();
+    this.runningJobs = new Hashtable<String, Job>();
 		
-    Iterator jobs = oldJobs.values().iterator();
-    while (jobs.hasNext()) {
-      Job nextJob = (Job)jobs.next();
+    for (Job nextJob : oldJobs.values()) {
       int state = nextJob.checkState();
       /*
         if (state != Job.RUNNING) {
@@ -227,13 +224,11 @@ public class JobControl implements Runnable{
   }
 	
   synchronized private void checkWaitingJobs() {
-    Hashtable oldJobs = null;
+    Hashtable<String, Job> oldJobs = null;
     oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable();
+    this.waitingJobs = new Hashtable<String, Job>();
 		
-    Iterator jobs = oldJobs.values().iterator();
-    while (jobs.hasNext()) {
-      Job nextJob = (Job)jobs.next();
+    for (Job nextJob : oldJobs.values()) {
       int state = nextJob.checkState();
       /*
         if (state != Job.WAITING) {
@@ -246,13 +241,11 @@ public class JobControl implements Runnable{
   }
 	
   synchronized private void startReadyJobs() {
-    Hashtable oldJobs = null;
+    Hashtable<String, Job> oldJobs = null;
     oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable();
+    this.readyJobs = new Hashtable<String, Job>();
 		
-    Iterator jobs = oldJobs.values().iterator();
-    while (jobs.hasNext()) {
-      Job nextJob = (Job)jobs.next();
+    for (Job nextJob : oldJobs.values()) {
       //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
       nextJob.submit();
       //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());