|
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
@@ -98,6 +99,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
|
|
|
private boolean useFixedPorts;
|
|
|
private boolean useRpc = false;
|
|
|
+ private int failoverTimeout;
|
|
|
|
|
|
private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
|
|
|
new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
|
|
@@ -189,12 +191,15 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
|
|
|
useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
|
|
|
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
|
|
|
+ failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
|
|
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
|
|
|
|
|
if (useRpc && !useFixedPorts) {
|
|
|
throw new YarnRuntimeException("Invalid configuration!" +
|
|
|
" Minicluster can use rpc only when configured to use fixed ports");
|
|
|
}
|
|
|
|
|
|
+ conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
|
|
|
if (resourceManagers.length > 1) {
|
|
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
|
|
if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
|
|
@@ -218,6 +223,13 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
// Don't try to login using keytab in the testcases.
|
|
|
}
|
|
|
};
|
|
|
+ if (!useFixedPorts) {
|
|
|
+ if (HAUtil.isHAEnabled(conf)) {
|
|
|
+ setHARMConfiguration(i, conf);
|
|
|
+ } else {
|
|
|
+ setNonHARMConfiguration(conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
addService(new ResourceManagerWrapper(i));
|
|
|
}
|
|
|
for(int index = 0; index < nodeManagers.length; index++) {
|
|
@@ -230,18 +242,103 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
|
|
}
|
|
|
|
|
|
+ private void setNonHARMConfiguration(Configuration conf) {
|
|
|
+ String hostname = MiniYARNCluster.getHostname();
|
|
|
+ conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
|
|
|
+ conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
|
|
|
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
|
|
|
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
|
|
|
+ WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setHARMConfiguration(final int index, Configuration conf) {
|
|
|
+ String hostname = MiniYARNCluster.getHostname();
|
|
|
+ for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
|
|
|
+ conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void initResourceManager(int index, Configuration conf) {
|
|
|
+ if (HAUtil.isHAEnabled(conf)) {
|
|
|
+ conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
|
|
|
+ }
|
|
|
+ resourceManagers[index].init(conf);
|
|
|
+ resourceManagers[index].getRMContext().getDispatcher().register(
|
|
|
+ RMAppAttemptEventType.class,
|
|
|
+ new EventHandler<RMAppAttemptEvent>() {
|
|
|
+ public void handle(RMAppAttemptEvent event) {
|
|
|
+ if (event instanceof RMAppAttemptRegistrationEvent) {
|
|
|
+ appMasters.put(event.getApplicationAttemptId(),
|
|
|
+ event.getTimestamp());
|
|
|
+ } else if (event instanceof RMAppAttemptUnregistrationEvent) {
|
|
|
+ appMasters.remove(event.getApplicationAttemptId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void startResourceManager(final int index) {
|
|
|
+ try {
|
|
|
+ Thread rmThread = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ resourceManagers[index].start();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rmThread.setName("RM-" + index);
|
|
|
+ rmThread.start();
|
|
|
+ int waitCount = 0;
|
|
|
+ while (resourceManagers[index].getServiceState() == STATE.INITED
|
|
|
+ && waitCount++ < 60) {
|
|
|
+ LOG.info("Waiting for RM to start...");
|
|
|
+ Thread.sleep(1500);
|
|
|
+ }
|
|
|
+ if (resourceManagers[index].getServiceState() != STATE.STARTED) {
|
|
|
+ // RM could have failed.
|
|
|
+ throw new IOException(
|
|
|
+ "ResourceManager failed to start. Final state is "
|
|
|
+ + resourceManagers[index].getServiceState());
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ throw new YarnRuntimeException(t);
|
|
|
+ }
|
|
|
+ LOG.info("MiniYARN ResourceManager address: " +
|
|
|
+ getConfig().get(YarnConfiguration.RM_ADDRESS));
|
|
|
+ LOG.info("MiniYARN ResourceManager web address: " +
|
|
|
+ WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
|
|
+ }
|
|
|
+
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ @VisibleForTesting
|
|
|
+ public synchronized void stopResourceManager(int index) {
|
|
|
+ if (resourceManagers[index] != null) {
|
|
|
+ resourceManagers[index].stop();
|
|
|
+ resourceManagers[index] = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ @VisibleForTesting
|
|
|
+ public synchronized void restartResourceManager(int index)
|
|
|
+ throws InterruptedException {
|
|
|
+ if (resourceManagers[index] != null) {
|
|
|
+ resourceManagers[index].stop();
|
|
|
+ resourceManagers[index] = null;
|
|
|
+ }
|
|
|
+ Configuration conf = getConfig();
|
|
|
+ resourceManagers[index] = new ResourceManager();
|
|
|
+ initResourceManager(index, getConfig());
|
|
|
+ startResourceManager(index);
|
|
|
+ }
|
|
|
+
|
|
|
public File getTestWorkDir() {
|
|
|
return testWorkDir;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * In a HA cluster, go through all the RMs and find the Active RM. If none
|
|
|
- * of them are active, wait upto 5 seconds for them to transition to Active.
|
|
|
+ * In a HA cluster, go through all the RMs and find the Active RM. In a
|
|
|
+ * non-HA cluster, return the index of the only RM.
|
|
|
*
|
|
|
- * In an non-HA cluster, return the index of the only RM.
|
|
|
- *
|
|
|
- * @return index of the active RM or -1 if none of them transition to
|
|
|
- * active even after 5 seconds of waiting
|
|
|
+ * @return index of the active RM or -1 if none of them turn active
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
@VisibleForTesting
|
|
@@ -250,9 +347,12 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- int numRetriesForRMBecomingActive = 5;
|
|
|
+ int numRetriesForRMBecomingActive = failoverTimeout / 100;
|
|
|
while (numRetriesForRMBecomingActive-- > 0) {
|
|
|
for (int i = 0; i < resourceManagers.length; i++) {
|
|
|
+ if (resourceManagers[i] == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
try {
|
|
|
if (HAServiceProtocol.HAServiceState.ACTIVE ==
|
|
|
resourceManagers[i].getRMContext().getRMAdminService()
|
|
@@ -265,7 +365,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
}
|
|
|
}
|
|
|
try {
|
|
|
- Thread.sleep(1000);
|
|
|
+ Thread.sleep(100);
|
|
|
} catch (InterruptedException e) {
|
|
|
throw new YarnRuntimeException("Interrupted while waiting for one " +
|
|
|
"of the ResourceManagers to become active");
|
|
@@ -282,7 +382,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
int activeRMIndex = getActiveRMIndex();
|
|
|
return activeRMIndex == -1
|
|
|
? null
|
|
|
- : this.resourceManagers[getActiveRMIndex()];
|
|
|
+ : this.resourceManagers[activeRMIndex];
|
|
|
}
|
|
|
|
|
|
public ResourceManager getResourceManager(int i) {
|
|
@@ -310,82 +410,21 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
index = i;
|
|
|
}
|
|
|
|
|
|
- private void setNonHARMConfiguration(Configuration conf) {
|
|
|
- String hostname = MiniYARNCluster.getHostname();
|
|
|
- conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
|
|
|
- conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
|
|
|
- conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
|
|
|
- conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
|
|
|
- WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
|
|
|
- }
|
|
|
-
|
|
|
- private void setHARMConfiguration(Configuration conf) {
|
|
|
- String hostname = MiniYARNCluster.getHostname();
|
|
|
- for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
|
|
|
- for (String id : HAUtil.getRMHAIds(conf)) {
|
|
|
- conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
protected synchronized void serviceInit(Configuration conf)
|
|
|
throws Exception {
|
|
|
- conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
|
|
|
-
|
|
|
- if (!useFixedPorts) {
|
|
|
- if (HAUtil.isHAEnabled(conf)) {
|
|
|
- setHARMConfiguration(conf);
|
|
|
- } else {
|
|
|
- setNonHARMConfiguration(conf);
|
|
|
- }
|
|
|
- }
|
|
|
- if (HAUtil.isHAEnabled(conf)) {
|
|
|
- conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
|
|
|
- }
|
|
|
- resourceManagers[index].init(conf);
|
|
|
- resourceManagers[index].getRMContext().getDispatcher().register
|
|
|
- (RMAppAttemptEventType.class,
|
|
|
- new EventHandler<RMAppAttemptEvent>() {
|
|
|
- public void handle(RMAppAttemptEvent event) {
|
|
|
- if (event instanceof RMAppAttemptRegistrationEvent) {
|
|
|
- appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
|
|
|
- } else if (event instanceof RMAppAttemptUnregistrationEvent) {
|
|
|
- appMasters.remove(event.getApplicationAttemptId());
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
+ initResourceManager(index, conf);
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected synchronized void serviceStart() throws Exception {
|
|
|
- try {
|
|
|
- new Thread() {
|
|
|
- public void run() {
|
|
|
- resourceManagers[index].start();
|
|
|
- }
|
|
|
- }.start();
|
|
|
- int waitCount = 0;
|
|
|
- while (resourceManagers[index].getServiceState() == STATE.INITED
|
|
|
- && waitCount++ < 60) {
|
|
|
- LOG.info("Waiting for RM to start...");
|
|
|
- Thread.sleep(1500);
|
|
|
- }
|
|
|
- if (resourceManagers[index].getServiceState() != STATE.STARTED) {
|
|
|
- // RM could have failed.
|
|
|
- throw new IOException(
|
|
|
- "ResourceManager failed to start. Final state is "
|
|
|
- + resourceManagers[index].getServiceState());
|
|
|
- }
|
|
|
- super.serviceStart();
|
|
|
- } catch (Throwable t) {
|
|
|
- throw new YarnRuntimeException(t);
|
|
|
- }
|
|
|
+ startResourceManager(index);
|
|
|
LOG.info("MiniYARN ResourceManager address: " +
|
|
|
getConfig().get(YarnConfiguration.RM_ADDRESS));
|
|
|
LOG.info("MiniYARN ResourceManager web address: " +
|
|
|
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
|
|
+ super.serviceStart();
|
|
|
}
|
|
|
|
|
|
private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
|
|
@@ -406,7 +445,6 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
waitForAppMastersToFinish(5000);
|
|
|
resourceManagers[index].stop();
|
|
|
}
|
|
|
- super.serviceStop();
|
|
|
|
|
|
if (Shell.WINDOWS) {
|
|
|
// On Windows, clean up the short temporary symlink that was created to
|
|
@@ -420,6 +458,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
testWorkDir.getAbsolutePath());
|
|
|
}
|
|
|
}
|
|
|
+ super.serviceStop();
|
|
|
}
|
|
|
}
|
|
|
|