|
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
|
|
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
|
|
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
|
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
|
|
@@ -155,6 +156,7 @@ public class MRAppMaster extends CompositeService {
|
|
private boolean newApiCommitter;
|
|
private boolean newApiCommitter;
|
|
private OutputCommitter committer;
|
|
private OutputCommitter committer;
|
|
private JobEventDispatcher jobEventDispatcher;
|
|
private JobEventDispatcher jobEventDispatcher;
|
|
|
|
+ private JobHistoryEventHandler jobHistoryEventHandler;
|
|
private boolean inRecovery = false;
|
|
private boolean inRecovery = false;
|
|
private SpeculatorEventDispatcher speculatorEventDispatcher;
|
|
private SpeculatorEventDispatcher speculatorEventDispatcher;
|
|
|
|
|
|
@@ -502,9 +504,9 @@ public class MRAppMaster extends CompositeService {
|
|
|
|
|
|
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
|
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
|
AppContext context) {
|
|
AppContext context) {
|
|
- JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
|
|
|
|
- getStartCount());
|
|
|
|
- return eventHandler;
|
|
|
|
|
|
+ this.jobHistoryEventHandler = new JobHistoryEventHandler(context,
|
|
|
|
+ getStartCount());
|
|
|
|
+ return this.jobHistoryEventHandler;
|
|
}
|
|
}
|
|
|
|
|
|
protected Speculator createSpeculator(Configuration conf, AppContext context) {
|
|
protected Speculator createSpeculator(Configuration conf, AppContext context) {
|
|
@@ -659,6 +661,10 @@ public class MRAppMaster extends CompositeService {
|
|
public void handle(ContainerAllocatorEvent event) {
|
|
public void handle(ContainerAllocatorEvent event) {
|
|
this.containerAllocator.handle(event);
|
|
this.containerAllocator.handle(event);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public void setSignalled(boolean isSignalled) {
|
|
|
|
+ ((RMCommunicator) containerAllocator).setSignalled(true);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -957,12 +963,16 @@ public class MRAppMaster extends CompositeService {
|
|
Integer.parseInt(nodePortString),
|
|
Integer.parseInt(nodePortString),
|
|
Integer.parseInt(nodeHttpPortString), appSubmitTime);
|
|
Integer.parseInt(nodeHttpPortString), appSubmitTime);
|
|
Runtime.getRuntime().addShutdownHook(
|
|
Runtime.getRuntime().addShutdownHook(
|
|
- new CompositeServiceShutdownHook(appMaster));
|
|
|
|
|
|
+ new MRAppMasterShutdownHook(appMaster));
|
|
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
|
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
|
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
|
|
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
|
|
String jobUserName = System
|
|
String jobUserName = System
|
|
.getenv(ApplicationConstants.Environment.USER.name());
|
|
.getenv(ApplicationConstants.Environment.USER.name());
|
|
conf.set(MRJobConfig.USER_NAME, jobUserName);
|
|
conf.set(MRJobConfig.USER_NAME, jobUserName);
|
|
|
|
+ // Do not automatically close FileSystem objects so that in case of
|
|
|
|
+ // SIGTERM I have a chance to write out the job history. I'll be closing
|
|
|
|
+ // the objects myself.
|
|
|
|
+ conf.setBoolean("fs.automatic.close", false);
|
|
initAndStartAppMaster(appMaster, conf, jobUserName);
|
|
initAndStartAppMaster(appMaster, conf, jobUserName);
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.fatal("Error starting MRAppMaster", t);
|
|
LOG.fatal("Error starting MRAppMaster", t);
|
|
@@ -970,6 +980,35 @@ public class MRAppMaster extends CompositeService {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // The shutdown hook that runs when a signal is received AND during normal
|
|
|
|
+ // close of the JVM.
|
|
|
|
+ static class MRAppMasterShutdownHook extends Thread {
|
|
|
|
+ MRAppMaster appMaster;
|
|
|
|
+ MRAppMasterShutdownHook(MRAppMaster appMaster) {
|
|
|
|
+ this.appMaster = appMaster;
|
|
|
|
+ }
|
|
|
|
+ public void run() {
|
|
|
|
+ LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
|
|
|
|
+ + "JobHistoryEventHandler.");
|
|
|
|
+ // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
|
|
|
|
+ // that they don't take too long in shutting down
|
|
|
|
+ if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
|
|
|
|
+ ((ContainerAllocatorRouter) appMaster.containerAllocator)
|
|
|
|
+ .setSignalled(true);
|
|
|
|
+ }
|
|
|
|
+ if(appMaster.jobHistoryEventHandler != null) {
|
|
|
|
+ appMaster.jobHistoryEventHandler.setSignalled(true);
|
|
|
|
+ }
|
|
|
|
+ appMaster.stop();
|
|
|
|
+ try {
|
|
|
|
+ //Close all the FileSystem objects
|
|
|
|
+ FileSystem.closeAll();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.warn("Failed to close all FileSystem objects", ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
|
|
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
|
|
final YarnConfiguration conf, String jobUserName) throws IOException,
|
|
final YarnConfiguration conf, String jobUserName) throws IOException,
|
|
InterruptedException {
|
|
InterruptedException {
|