|
@@ -1130,28 +1130,33 @@ public class TaskTracker
|
|
|
* We no longer need anything from this task, as the job has
|
|
|
* finished. If the task is still running, kill it (and clean up
|
|
|
*/
|
|
|
- public synchronized void jobHasFinished() throws IOException {
|
|
|
-
|
|
|
- if (getRunState() == TaskStatus.State.RUNNING) {
|
|
|
+ public void jobHasFinished() throws IOException {
|
|
|
+ boolean killTask = false;
|
|
|
+ synchronized(this){
|
|
|
+ killTask = (getRunState() == TaskStatus.State.RUNNING);
|
|
|
+ if (killTask) {
|
|
|
killAndCleanup(false);
|
|
|
- } else {
|
|
|
- cleanup();
|
|
|
- }
|
|
|
- if (keepJobFiles)
|
|
|
- return;
|
|
|
-
|
|
|
- // Delete temp directory in case any task used PhasedFileSystem.
|
|
|
- try{
|
|
|
- String systemDir = task.getConf().get("mapred.system.dir");
|
|
|
- Path taskTempDir = new Path(systemDir + "/" +
|
|
|
- task.getJobId() + "/" + task.getTipId());
|
|
|
- if( fs.exists(taskTempDir)){
|
|
|
- fs.delete(taskTempDir) ;
|
|
|
}
|
|
|
- }catch(IOException e){
|
|
|
- LOG.warn("Error in deleting reduce temporary output",e);
|
|
|
+ }
|
|
|
+ if (!killTask) {
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+ if (keepJobFiles)
|
|
|
+ return;
|
|
|
+
|
|
|
+ synchronized(this){
|
|
|
+ // Delete temp directory in case any task used PhasedFileSystem.
|
|
|
+ try{
|
|
|
+ String systemDir = task.getConf().get("mapred.system.dir");
|
|
|
+ Path taskTempDir = new Path(systemDir + "/" +
|
|
|
+ task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId());
|
|
|
+ if( fs.exists(taskTempDir)){
|
|
|
+ fs.delete(taskTempDir) ;
|
|
|
+ }
|
|
|
+ }catch(IOException e){
|
|
|
+ LOG.warn("Error in deleting reduce temporary output",e);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
// Delete the job directory for this
|
|
|
// task if the job is done/failed
|
|
|
if (purgeJobFiles) {
|
|
@@ -1205,6 +1210,9 @@ public class TaskTracker
|
|
|
* We no longer need anything from this task. Either the
|
|
|
* controlling job is all done and the files have been copied
|
|
|
* away, or the task failed and we don't need the remains.
|
|
|
+ * Any calls to cleanup should not lock the tip first.
|
|
|
+ * cleanup does the right thing- updates tasks in Tasktracker
|
|
|
+ * by locking tasktracker first and then locks the tip.
|
|
|
*/
|
|
|
void cleanup() throws IOException {
|
|
|
String taskId = task.getTaskId();
|