|
@@ -25,18 +25,21 @@ import java.net.UnknownHostException;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.http.HttpConfig;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
|
import org.apache.hadoop.util.Shell;
|
|
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.conf.HAUtil;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -87,7 +90,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
}
|
|
|
|
|
|
private NodeManager[] nodeManagers;
|
|
|
- private ResourceManager resourceManager;
|
|
|
+ private ResourceManager[] resourceManagers;
|
|
|
|
|
|
private ResourceManagerWrapper resourceManagerWrapper;
|
|
|
|
|
@@ -103,12 +106,14 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
|
|
|
/**
|
|
|
* @param testName name of the test
|
|
|
- * @param noOfNodeManagers the number of node managers in the cluster
|
|
|
+ * @param numResourceManagers the number of resource managers in the cluster
|
|
|
+ * @param numNodeManagers the number of node managers in the cluster
|
|
|
* @param numLocalDirs the number of nm-local-dirs per nodemanager
|
|
|
* @param numLogDirs the number of nm-log-dirs per nodemanager
|
|
|
*/
|
|
|
- public MiniYARNCluster(String testName, int noOfNodeManagers,
|
|
|
- int numLocalDirs, int numLogDirs) {
|
|
|
+ public MiniYARNCluster(
|
|
|
+ String testName, int numResourceManagers, int numNodeManagers,
|
|
|
+ int numLocalDirs, int numLogDirs) {
|
|
|
super(testName.replace("$", ""));
|
|
|
this.numLocalDirs = numLocalDirs;
|
|
|
this.numLogDirs = numLogDirs;
|
|
@@ -157,28 +162,103 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
this.testWorkDir = targetWorkDir;
|
|
|
}
|
|
|
|
|
|
- resourceManagerWrapper = new ResourceManagerWrapper();
|
|
|
- addService(resourceManagerWrapper);
|
|
|
- nodeManagers = new CustomNodeManager[noOfNodeManagers];
|
|
|
- for(int index = 0; index < noOfNodeManagers; index++) {
|
|
|
+ resourceManagers = new ResourceManager[numResourceManagers];
|
|
|
+ for (int i = 0; i < numResourceManagers; i++) {
|
|
|
+ resourceManagers[i] = new ResourceManager();
|
|
|
+ addService(new ResourceManagerWrapper(i));
|
|
|
+ }
|
|
|
+ nodeManagers = new CustomNodeManager[numNodeManagers];
|
|
|
+ for(int index = 0; index < numNodeManagers; index++) {
|
|
|
addService(new NodeManagerWrapper(index));
|
|
|
nodeManagers[index] = new CustomNodeManager();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param testName name of the test
|
|
|
+ * @param numNodeManagers the number of node managers in the cluster
|
|
|
+ * @param numLocalDirs the number of nm-local-dirs per nodemanager
|
|
|
+ * @param numLogDirs the number of nm-log-dirs per nodemanager
|
|
|
+ */
|
|
|
+ public MiniYARNCluster(String testName, int numNodeManagers,
|
|
|
+ int numLocalDirs, int numLogDirs) {
|
|
|
+ this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
|
- super.serviceInit(conf instanceof YarnConfiguration ? conf
|
|
|
- : new YarnConfiguration(
|
|
|
- conf));
|
|
|
+ if (resourceManagers.length > 1) {
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
|
|
+
|
|
|
+ StringBuilder rmIds = new StringBuilder();
|
|
|
+ for (int i = 0; i < resourceManagers.length; i++) {
|
|
|
+ if (i != 0) {
|
|
|
+ rmIds.append(",");
|
|
|
+ }
|
|
|
+ rmIds.append("rm" + i);
|
|
|
+ }
|
|
|
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
|
|
|
+ }
|
|
|
+ super.serviceInit(
|
|
|
+ conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
|
|
}
|
|
|
|
|
|
public File getTestWorkDir() {
|
|
|
return testWorkDir;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * In a HA cluster, go through all the RMs and find the Active RM. If none
|
|
|
+ * of them are active, wait upto 5 seconds for them to transition to Active.
|
|
|
+ *
|
|
|
+ * In an non-HA cluster, return the index of the only RM.
|
|
|
+ *
|
|
|
+ * @return index of the active RM
|
|
|
+ */
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ @VisibleForTesting
|
|
|
+ int getActiveRMIndex() {
|
|
|
+ if (resourceManagers.length == 1) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ int numRetriesForRMBecomingActive = 5;
|
|
|
+ while (numRetriesForRMBecomingActive-- > 0) {
|
|
|
+ for (int i = 0; i < resourceManagers.length; i++) {
|
|
|
+ try {
|
|
|
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
|
|
|
+ resourceManagers[i].getRMContext().getRMAdminService()
|
|
|
+ .getServiceStatus().getState()) {
|
|
|
+ return i;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new YarnRuntimeException("Couldn't read the status of " +
|
|
|
+ "a ResourceManger in the HA ensemble.", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new YarnRuntimeException("Interrupted while waiting for one " +
|
|
|
+ "of the ResourceManagers to become active");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the active {@link ResourceManager} of the cluster,
|
|
|
+ * null if none of them are active.
|
|
|
+ */
|
|
|
public ResourceManager getResourceManager() {
|
|
|
- return this.resourceManager;
|
|
|
+ int activeRMIndex = getActiveRMIndex();
|
|
|
+ return activeRMIndex == -1
|
|
|
+ ? null
|
|
|
+ : this.resourceManagers[getActiveRMIndex()];
|
|
|
+ }
|
|
|
+
|
|
|
+ public ResourceManager getResourceManager(int i) {
|
|
|
+ return this.resourceManagers[i];
|
|
|
}
|
|
|
|
|
|
public NodeManager getNodeManager(int i) {
|
|
@@ -195,8 +275,29 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
}
|
|
|
|
|
|
private class ResourceManagerWrapper extends AbstractService {
|
|
|
- public ResourceManagerWrapper() {
|
|
|
- super(ResourceManagerWrapper.class.getName());
|
|
|
+ private int index;
|
|
|
+
|
|
|
+ public ResourceManagerWrapper(int i) {
|
|
|
+ super(ResourceManagerWrapper.class.getName() + "_" + i);
|
|
|
+ index = i;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setNonHARMConfiguration(Configuration conf) {
|
|
|
+ String hostname = MiniYARNCluster.getHostname();
|
|
|
+ conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
|
|
|
+ conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
|
|
|
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
|
|
|
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
|
|
|
+ WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setHARMConfiguration(Configuration conf) {
|
|
|
+ String rmId = "rm" + index;
|
|
|
+ String hostname = MiniYARNCluster.getHostname();
|
|
|
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
|
|
+ for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
|
|
+ conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -206,22 +307,15 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
if (!conf.getBoolean(
|
|
|
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
|
|
|
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
|
|
|
- // pick free random ports.
|
|
|
- String hostname = MiniYARNCluster.getHostname();
|
|
|
- conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
|
|
|
- conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
|
|
|
- conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
|
|
|
- conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
|
|
|
- WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
|
|
|
+ if (HAUtil.isHAEnabled(conf)) {
|
|
|
+ setHARMConfiguration(conf);
|
|
|
+ } else {
|
|
|
+ setNonHARMConfiguration(conf);
|
|
|
+ }
|
|
|
}
|
|
|
- resourceManager = new ResourceManager() {
|
|
|
- @Override
|
|
|
- protected void doSecureLogin() throws IOException {
|
|
|
- // Don't try to login using keytab in the testcase.
|
|
|
- };
|
|
|
- };
|
|
|
- resourceManager.init(conf);
|
|
|
- resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
|
|
|
+ resourceManagers[index].init(conf);
|
|
|
+ resourceManagers[index].getRMContext().getDispatcher().register
|
|
|
+ (RMAppAttemptEventType.class,
|
|
|
new EventHandler<RMAppAttemptEvent>() {
|
|
|
public void handle(RMAppAttemptEvent event) {
|
|
|
if (event instanceof RMAppAttemptRegistrationEvent) {
|
|
@@ -239,20 +333,20 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
try {
|
|
|
new Thread() {
|
|
|
public void run() {
|
|
|
- resourceManager.start();
|
|
|
- };
|
|
|
+ resourceManagers[index].start();
|
|
|
+ }
|
|
|
}.start();
|
|
|
int waitCount = 0;
|
|
|
- while (resourceManager.getServiceState() == STATE.INITED
|
|
|
+ while (resourceManagers[index].getServiceState() == STATE.INITED
|
|
|
&& waitCount++ < 60) {
|
|
|
LOG.info("Waiting for RM to start...");
|
|
|
Thread.sleep(1500);
|
|
|
}
|
|
|
- if (resourceManager.getServiceState() != STATE.STARTED) {
|
|
|
+ if (resourceManagers[index].getServiceState() != STATE.STARTED) {
|
|
|
// RM could have failed.
|
|
|
throw new IOException(
|
|
|
"ResourceManager failed to start. Final state is "
|
|
|
- + resourceManager.getServiceState());
|
|
|
+ + resourceManagers[index].getServiceState());
|
|
|
}
|
|
|
super.serviceStart();
|
|
|
} catch (Throwable t) {
|
|
@@ -278,9 +372,9 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
|
|
|
@Override
|
|
|
protected synchronized void serviceStop() throws Exception {
|
|
|
- if (resourceManager != null) {
|
|
|
+ if (resourceManagers[index] != null) {
|
|
|
waitForAppMastersToFinish(5000);
|
|
|
- resourceManager.stop();
|
|
|
+ resourceManagers[index].stop();
|
|
|
}
|
|
|
super.serviceStop();
|
|
|
|
|
@@ -372,7 +466,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
new Thread() {
|
|
|
public void run() {
|
|
|
nodeManagers[index].start();
|
|
|
- };
|
|
|
+ }
|
|
|
}.start();
|
|
|
int waitCount = 0;
|
|
|
while (nodeManagers[index].getServiceState() == STATE.INITED
|
|
@@ -398,12 +492,12 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private class CustomNodeManager extends NodeManager {
|
|
|
@Override
|
|
|
protected void doSecureLogin() throws IOException {
|
|
|
// Don't try to login using keytab in the testcase.
|
|
|
- };
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
@@ -412,8 +506,8 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
healthChecker, metrics) {
|
|
|
@Override
|
|
|
protected ResourceTracker getRMClient() {
|
|
|
- final ResourceTrackerService rt = resourceManager
|
|
|
- .getResourceTrackerService();
|
|
|
+ final ResourceTrackerService rt =
|
|
|
+ getResourceManager().getResourceTrackerService();
|
|
|
final RecordFactory recordFactory =
|
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
|
|
|
@@ -424,8 +518,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
public NodeHeartbeatResponse nodeHeartbeat(
|
|
|
NodeHeartbeatRequest request) throws YarnException,
|
|
|
IOException {
|
|
|
- NodeHeartbeatResponse response = recordFactory.newRecordInstance(
|
|
|
- NodeHeartbeatResponse.class);
|
|
|
+ NodeHeartbeatResponse response;
|
|
|
try {
|
|
|
response = rt.nodeHeartbeat(request);
|
|
|
} catch (YarnException e) {
|
|
@@ -440,8 +533,7 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
public RegisterNodeManagerResponse registerNodeManager(
|
|
|
RegisterNodeManagerRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- RegisterNodeManagerResponse response = recordFactory.
|
|
|
- newRecordInstance(RegisterNodeManagerResponse.class);
|
|
|
+ RegisterNodeManagerResponse response;
|
|
|
try {
|
|
|
response = rt.registerNodeManager(request);
|
|
|
} catch (YarnException e) {
|
|
@@ -452,13 +544,11 @@ public class MiniYARNCluster extends CompositeService {
|
|
|
return response;
|
|
|
}
|
|
|
};
|
|
|
- };
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
- protected void stopRMProxy() {
|
|
|
- return;
|
|
|
- }
|
|
|
+ protected void stopRMProxy() { }
|
|
|
};
|
|
|
- };
|
|
|
+ }
|
|
|
}
|
|
|
}
|