|
@@ -7,7 +7,9 @@ import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -26,11 +28,12 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
|
|
* Following will be the format which the final command execution would look :
|
|
|
* <br/>
|
|
|
* <code>
|
|
|
- * ssh master-host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName
|
|
|
- * --config HADOOP_CONF_DIR (start|stop) masterCommand'
|
|
|
+ * ssh host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName
|
|
|
+ * --config HADOOP_CONF_DIR (start|stop) command'
|
|
|
* </code>
|
|
|
*/
|
|
|
-public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
|
|
|
+public abstract class HadoopDaemonRemoteCluster
|
|
|
+ implements ClusterProcessManager {
|
|
|
|
|
|
private static final Log LOG = LogFactory
|
|
|
.getLog(HadoopDaemonRemoteCluster.class.getName());
|
|
@@ -53,62 +56,46 @@ public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
|
|
|
private String hadoopHome;
|
|
|
private String hadoopConfDir;
|
|
|
private String deployed_hadoopConfDir;
|
|
|
- private String masterCommand;
|
|
|
- private String slaveCommand;
|
|
|
+ private final Set<Enum<?>> roles;
|
|
|
|
|
|
- private RemoteProcess master;
|
|
|
- private Map<String, RemoteProcess> slaves;
|
|
|
+ private final List<HadoopDaemonInfo> daemonInfos;
|
|
|
+ private List<RemoteProcess> processes;
|
|
|
+
|
|
|
+ public static class HadoopDaemonInfo {
|
|
|
+ public final String cmd;
|
|
|
+ public final Enum<?> role;
|
|
|
+ public final String hostFile;
|
|
|
+ public HadoopDaemonInfo(String cmd, Enum<?> role, String hostFile) {
|
|
|
+ super();
|
|
|
+ this.cmd = cmd;
|
|
|
+ this.role = role;
|
|
|
+ this.hostFile = hostFile;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public HadoopDaemonRemoteCluster(List<HadoopDaemonInfo> daemonInfos) {
|
|
|
+ this.daemonInfos = daemonInfos;
|
|
|
+ this.roles = new HashSet<Enum<?>>();
|
|
|
+ for (HadoopDaemonInfo info : daemonInfos) {
|
|
|
+ this.roles.add(info.role);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
- public void init(ClusterType t, Configuration conf) throws Exception {
|
|
|
- /*
|
|
|
- * Initialization strategy of the HadoopDaemonRemoteCluster is three staged
|
|
|
- * process: 1. Populate script names based on the type of passed cluster. 2.
|
|
|
- * Populate the required directories. 3. Populate the master and slaves.
|
|
|
- */
|
|
|
- populateScriptNames(t);
|
|
|
+ public void init(Configuration conf) throws IOException {
|
|
|
populateDirectories(conf);
|
|
|
- this.slaves = new HashMap<String, RemoteProcess>();
|
|
|
+ this.processes = new ArrayList<RemoteProcess>();
|
|
|
populateDaemons(deployed_hadoopConfDir);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Method to populate the required master and slave commands which are used to
|
|
|
- * manage the cluster.<br/>
|
|
|
- *
|
|
|
- * @param t
|
|
|
- * type of cluster to be initialized.
|
|
|
- *
|
|
|
- * @throws UnsupportedOperationException
|
|
|
- * if the passed cluster type is not MAPRED or HDFS
|
|
|
- */
|
|
|
- private void populateScriptNames(ClusterType t) {
|
|
|
- switch (t) {
|
|
|
- case MAPRED:
|
|
|
- masterCommand = "jobtracker";
|
|
|
- slaveCommand = "tasktracker";
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Created mapred hadoop daemon remote cluster manager with "
|
|
|
- + "scriptName: mapred, masterCommand: jobtracker, "
|
|
|
- + "slaveCommand: tasktracker");
|
|
|
- }
|
|
|
- break;
|
|
|
- case HDFS:
|
|
|
- masterCommand = "namenode";
|
|
|
- slaveCommand = "datanode";
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Created hdfs hadoop daemon remote cluster manager with "
|
|
|
- + "scriptName: hdfs, masterCommand: namenode, "
|
|
|
- + "slaveCommand: datanode");
|
|
|
- }
|
|
|
- break;
|
|
|
- default:
|
|
|
- LOG.error("Cluster type :" + t
|
|
|
- + "is not supported currently by HadoopDaemonRemoteCluster");
|
|
|
- throw new UnsupportedOperationException(
|
|
|
- "The specified cluster type is not supported by the " +
|
|
|
- "HadoopDaemonRemoteCluster");
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public List<RemoteProcess> getAllProcesses() {
|
|
|
+ return processes;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Set<Enum<?>> getRoles() {
|
|
|
+ return roles;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -116,14 +103,14 @@ public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
|
|
|
*
|
|
|
* @param conf
|
|
|
* Configuration object containing values for
|
|
|
- * TEST_SYSTEM_HADOOPHOME_CONF_KEY and
|
|
|
- * TEST_SYSTEM_HADOOPCONFDIR_CONF_KEY
|
|
|
+ * CONF_HADOOPHOME and
|
|
|
+ * CONF_HADOOPCONFDIR
|
|
|
*
|
|
|
* @throws IllegalArgumentException
|
|
|
* if the configuration or system property set does not contain
|
|
|
* values for the required keys.
|
|
|
*/
|
|
|
- private void populateDirectories(Configuration conf) {
|
|
|
+ protected void populateDirectories(Configuration conf) {
|
|
|
hadoopHome = conf.get(CONF_HADOOPHOME, System
|
|
|
.getProperty(CONF_HADOOPHOME));
|
|
|
hadoopConfDir = conf.get(CONF_HADOOPCONFDIR, System
|
|
@@ -146,70 +133,58 @@ public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
|
|
|
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public RemoteProcess getMaster() {
|
|
|
- return master;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, RemoteProcess> getSlaves() {
|
|
|
- return slaves;
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public void start() throws IOException {
|
|
|
- // start master first.
|
|
|
- master.start();
|
|
|
- for (RemoteProcess slave : slaves.values()) {
|
|
|
- slave.start();
|
|
|
+ for (RemoteProcess process : processes) {
|
|
|
+ process.start();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void stop() throws IOException {
|
|
|
- master.kill();
|
|
|
- for (RemoteProcess slave : slaves.values()) {
|
|
|
- slave.kill();
|
|
|
+ for (RemoteProcess process : processes) {
|
|
|
+ process.kill();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void populateDaemons(String confLocation) throws IOException {
|
|
|
- File mastersFile = new File(confLocation, "masters");
|
|
|
- File slavesFile = new File(confLocation, "slaves");
|
|
|
+ protected void populateDaemon(String confLocation,
|
|
|
+ HadoopDaemonInfo info) throws IOException {
|
|
|
+ File hostFile = new File(confLocation, info.hostFile);
|
|
|
BufferedReader reader = null;
|
|
|
+ reader = new BufferedReader(new FileReader(hostFile));
|
|
|
+ String host = null;
|
|
|
try {
|
|
|
- reader = new BufferedReader(new FileReader(mastersFile));
|
|
|
- String masterHost = null;
|
|
|
- masterHost = reader.readLine();
|
|
|
- if (masterHost != null && !masterHost.trim().isEmpty()) {
|
|
|
- master = new ScriptDaemon(masterCommand, masterHost);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- reader.close();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.error("Can't read masters file from " + confLocation);
|
|
|
+ boolean foundAtLeastOne = false;
|
|
|
+ while ((host = reader.readLine()) != null) {
|
|
|
+ if (host.trim().isEmpty()) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Hostname could not be found in file " + info.hostFile);
|
|
|
+ }
|
|
|
+ InetAddress addr = InetAddress.getByName(host);
|
|
|
+ RemoteProcess process = new ScriptDaemon(info.cmd,
|
|
|
+ addr.getCanonicalHostName(), info.role);
|
|
|
+ processes.add(process);
|
|
|
+ foundAtLeastOne = true;
|
|
|
}
|
|
|
-
|
|
|
- }
|
|
|
- try {
|
|
|
- reader = new BufferedReader(new FileReader(slavesFile));
|
|
|
- String slaveHost = null;
|
|
|
- while ((slaveHost = reader.readLine()) != null) {
|
|
|
- InetAddress addr = InetAddress.getByName(slaveHost);
|
|
|
- RemoteProcess slave = new ScriptDaemon(slaveCommand,
|
|
|
- addr.getCanonicalHostName());
|
|
|
- slaves.put(addr.getCanonicalHostName(), slave);
|
|
|
+ if (!foundAtLeastOne) {
|
|
|
+ throw new IllegalArgumentException("Alteast one hostname " +
|
|
|
+ "is required to be present in file - " + info.hostFile);
|
|
|
}
|
|
|
} finally {
|
|
|
try {
|
|
|
reader.close();
|
|
|
} catch (Exception e) {
|
|
|
- LOG.error("Can't read slaves file from " + confLocation);
|
|
|
+ LOG.warn("Could not close reader");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ protected void populateDaemons(String confLocation) throws IOException {
|
|
|
+ for (HadoopDaemonInfo info : daemonInfos) {
|
|
|
+ populateDaemon(confLocation, info);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The core daemon class which actually implements the remote process
|
|
|
* management of actual daemon processes in the cluster.
|
|
@@ -222,10 +197,12 @@ public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
|
|
|
private static final String SCRIPT_NAME = "hadoop-daemon.sh";
|
|
|
private final String daemonName;
|
|
|
private final String hostName;
|
|
|
+ private final Enum<?> role;
|
|
|
|
|
|
- public ScriptDaemon(String daemonName, String hostName) {
|
|
|
+ public ScriptDaemon(String daemonName, String hostName, Enum<?> role) {
|
|
|
this.daemonName = daemonName;
|
|
|
this.hostName = hostName;
|
|
|
+ this.role = role;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -272,6 +249,10 @@ public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
|
|
|
public void start() throws IOException {
|
|
|
buildCommandExecutor(START_COMMAND).execute();
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
+ @Override
|
|
|
+ public Enum<?> getRole() {
|
|
|
+ return role;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|