Browse Source

HADOOP-6392. Run namenode and jobtracker on separate EC2 instances.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@888998 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 15 years ago
parent
commit
2e5fd1c0ab

+ 3 - 0
CHANGES.txt

@@ -16,6 +16,9 @@ Trunk (unreleased changes)
 
     HADOOP-6108. Add support for EBS storage on EC2. (tomwhite)
 
+    HADOOP-6392. Run namenode and jobtracker on separate EC2 instances.
+    (tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-6283. Improve the exception messages thrown by

+ 22 - 11
src/contrib/cloud/README.txt

@@ -61,8 +61,18 @@ type:
 
 % hadoop-ec2 launch-cluster my-hadoop-cluster 10
 
-This will boot the master node and 10 worker nodes. When the nodes have started
-and the Hadoop cluster has come up, the console will display a message like
+This will boot the master node and 10 worker nodes. The master node runs the
+namenode, secondary namenode, and jobtracker, and each worker node runs a
+datanode and a tasktracker. Equivalently the cluster could be launched as:
+
+% hadoop-ec2 launch-cluster my-hadoop-cluster 1 nn,snn,jt 10 dn,tt
+
+Note that using this notation you can launch a split namenode/jobtracker cluster
+
+% hadoop-ec2 launch-cluster my-hadoop-cluster 1 nn,snn 1 jt 10 dn,tt
+
+When the nodes have started and the Hadoop cluster has come up, the console will
+display a message like
 
   Browse the cluster at http://ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com/
 
@@ -70,7 +80,8 @@ You can access Hadoop's web UI by visiting this URL. By default, port 80 is
 opened for access from your client machine. You may change the firewall settings
 (to allow access from a network, rather than just a single machine, for example)
 by using the Amazon EC2 command line tools, or by using a tool like Elastic Fox.
-The security group to change is the one named <cluster-name>-master.
+There is a security group for each node's role. The one for the namenode
+is <cluster-name>-nn, for example.
 
 For security reasons, traffic from the network your client is running on is
 proxied through the master node of the cluster using an SSH tunnel (a SOCKS
@@ -109,13 +120,13 @@ have to do the formatting once.
 
 % hadoop-ec2 create-formatted-snapshot my-ebs-cluster 100
 
-We create storage for a single master and for two slaves. The volumes to create
-are described in a JSON spec file, which references the snapshot we just
+We create storage for a single namenode and for two datanodes. The volumes to
+create are described in a JSON spec file, which references the snapshot we just
 created. Here is the contents of a JSON file, called
 my-ebs-cluster-storage-spec.json:
 
 {
-  "master": [
+  "nn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",
@@ -129,7 +140,7 @@ my-ebs-cluster-storage-spec.json:
       "snapshot_id": "snap-268e704f"
     }
   ],
-  "slave": [
+  "dn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",
@@ -146,7 +157,7 @@ my-ebs-cluster-storage-spec.json:
 }
 
 
-Each role (here "master" and "slave") is the key to an array of volume
+Each role (here "nn" and "dn") is the key to an array of volume
 specifications. In this example, the "slave" role has two devices ("/dev/sdj"
 and "/dev/sdk") with different mount points, sizes, and generated from an EBS
 snapshot. The snapshot is the formatted snapshot created earlier, so that the
@@ -155,9 +166,9 @@ of the snapshot created earlier.
 
 Let's create actual volumes using this file.
 
-% hadoop-ec2 create-storage my-ebs-cluster master 1 \
+% hadoop-ec2 create-storage my-ebs-cluster nn 1 \
     my-ebs-cluster-storage-spec.json
-% hadoop-ec2 create-storage my-ebs-cluster slave 2 \
+% hadoop-ec2 create-storage my-ebs-cluster dn 2 \
     my-ebs-cluster-storage-spec.json
 
 Now let's start the cluster with 2 slave nodes:
@@ -214,7 +225,7 @@ Running a job is straightforward:
 
 Of course, these examples assume that you have installed Hadoop on your local
 machine. It is also possible to launch jobs from within the cluster. First log
-into the master node:
+into the namenode:
 
 % hadoop-ec2 login my-hadoop-cluster
 

+ 2 - 2
src/contrib/cloud/src/integration-test/ebs-storage-spec.json

@@ -1,5 +1,5 @@
 {
-  "master": [
+  "nn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",
@@ -13,7 +13,7 @@
       "snapshot_id": "snap-fe44bb97"
     }
   ],
-  "slave": [
+  "dn": [
     {
       "device": "/dev/sdj",
       "mount_point": "/ebs1",

+ 2 - 2
src/contrib/cloud/src/integration-test/persistent-cluster.sh

@@ -71,10 +71,10 @@ fi
 
 # Create storage
 $HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \
-  --availability-zone=$AVAILABILITY_ZONE $CLUSTER master 1 \
+  --availability-zone=$AVAILABILITY_ZONE $CLUSTER nn 1 \
   $bin/ebs-storage-spec.json
 $HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \
-  --availability-zone=$AVAILABILITY_ZONE $CLUSTER slave 1 \
+  --availability-zone=$AVAILABILITY_ZONE $CLUSTER dn 1 \
   $bin/ebs-storage-spec.json
 
 # Launch a cluster

+ 7 - 1
src/contrib/cloud/src/integration-test/transient-cluster.sh

@@ -41,6 +41,7 @@ HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py}
 HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2}
 SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \
   -o StrictHostKeyChecking=no"}
+LAUNCH_ARGS=${LAUNCH_ARGS:-1} # Try LAUNCH_ARGS="1 nn,snn 1 jt 1 dn,tt"
 
 HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER
 export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER
@@ -56,7 +57,12 @@ fi
 # Launch a cluster
 $HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \
   --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \
-  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1
+  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER \
+  $LAUNCH_ARGS
+  
+# List clusters
+$HADOOP_CLOUD_SCRIPT list --config-dir=$CONFIG_DIR
+$HADOOP_CLOUD_SCRIPT list --config-dir=$CONFIG_DIR $CLUSTER
 
 # Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID
 eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \

+ 42 - 24
src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh

@@ -30,16 +30,20 @@
 # Substitute environment variables passed by the client
 export %ENV%
 
-if [ -z "$MASTER_HOST" ]; then
-  IS_MASTER=true
-  MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname`
-else
-  IS_MASTER=false
-fi
-
 HADOOP_VERSION=${HADOOP_VERSION:-0.20.1}
 HADOOP_HOME=/usr/local/hadoop-$HADOOP_VERSION
 HADOOP_CONF_DIR=$HADOOP_HOME/conf
+SELF_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname`
+for role in $(echo "$ROLES" | tr "," "\n"); do
+  case $role in
+  nn)
+    NN_HOST=$SELF_HOST
+    ;;
+  jt)
+    JT_HOST=$SELF_HOST
+    ;;
+  esac
+done
 
 function register_auto_shutdown() {
   if [ ! -z "$AUTO_SHUTDOWN" ]; then
@@ -237,6 +241,9 @@ function configure_hadoop() {
   # Create tmp directory
   mkdir /mnt/tmp
   chmod a+rwxt /mnt/tmp
+  
+  mkdir /etc/hadoop
+  ln -s $HADOOP_CONF_DIR /etc/hadoop/conf
 
   ##############################################################################
   # Modify this section to customize your Hadoop cluster.
@@ -301,7 +308,7 @@ function configure_hadoop() {
 </property>
 <property>
   <name>fs.default.name</name>
-  <value>hdfs://$MASTER_HOST:8020/</value>
+  <value>hdfs://$NN_HOST:8020/</value>
 </property>
 <property>
   <name>fs.trash.interval</name>
@@ -328,7 +335,7 @@ function configure_hadoop() {
 </property>
 <property>
   <name>mapred.job.tracker</name>
-  <value>$MASTER_HOST:8021</value>
+  <value>$JT_HOST:8021</value>
 </property>
 <property>
   <name>mapred.job.tracker.handler.count</name>
@@ -473,8 +480,8 @@ Auto-Configuration (PAC)</a> file.  To manage multiple proxy configurations,
 you may wish to use
 <a href="https://addons.mozilla.org/en-US/firefox/addon/2464">FoxyProxy</a>.
 <ul>
-<li><a href="http://$MASTER_HOST:50070/">NameNode</a>
-<li><a href="http://$MASTER_HOST:50030/">JobTracker</a>
+<li><a href="http://$NN_HOST:50070/">NameNode</a>
+<li><a href="http://$JT_HOST:50030/">JobTracker</a>
 </ul>
 </body>
 </html>
@@ -484,7 +491,7 @@ END
 
 }
 
-function start_hadoop_master() {
+function start_namenode() {
   if which dpkg &> /dev/null; then
     AS_HADOOP="su -s /bin/bash - hadoop -c"
   elif which rpm &> /dev/null; then
@@ -495,8 +502,6 @@ function start_hadoop_master() {
   [ ! -e $FIRST_MOUNT/hadoop/hdfs ] && $AS_HADOOP "$HADOOP_HOME/bin/hadoop namenode -format"
 
   $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start namenode"
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start secondarynamenode"
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start jobtracker"
 
   $AS_HADOOP "$HADOOP_HOME/bin/hadoop dfsadmin -safemode wait"
   $AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -mkdir /user"
@@ -506,15 +511,13 @@ function start_hadoop_master() {
 
 }
 
-function start_hadoop_slave() {
+function start_daemon() {
   if which dpkg &> /dev/null; then
     AS_HADOOP="su -s /bin/bash - hadoop -c"
   elif which rpm &> /dev/null; then
     AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c"
   fi
-
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start datanode"
-  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker"
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start $1"
 }
 
 register_auto_shutdown
@@ -522,9 +525,24 @@ install_user_packages
 install_hadoop
 configure_hadoop
 
-if $IS_MASTER ; then
-  setup_web
-  start_hadoop_master
-else
-  start_hadoop_slave
-fi
+for role in $(echo "$ROLES" | tr "," "\n"); do
+  case $role in
+  nn)
+    setup_web
+    start_namenode
+    ;;
+  snn)
+    start_daemon secondarynamenode
+    ;;
+  jt)
+    start_daemon jobtracker
+    ;;
+  dn)
+    start_daemon datanode
+    ;;
+  tt)
+    start_daemon tasktracker
+    ;;
+  esac
+done
+

+ 106 - 146
src/contrib/cloud/src/py/hadoop/cloud/cli.py

@@ -16,17 +16,20 @@
 from __future__ import with_statement
 
 import ConfigParser
-import hadoop.cloud.commands as commands
 from hadoop.cloud.cluster import get_cluster
-from hadoop.cloud.cluster import TimeoutException
-from hadoop.cloud.providers.ec2 import Ec2Storage
+from hadoop.cloud.service import InstanceTemplate
+from hadoop.cloud.service import HadoopService
+from hadoop.cloud.service import NAMENODE
+from hadoop.cloud.service import SECONDARY_NAMENODE
+from hadoop.cloud.service import JOBTRACKER
+from hadoop.cloud.service import DATANODE
+from hadoop.cloud.service import TASKTRACKER
 from hadoop.cloud.util import merge_config_with_options
 from hadoop.cloud.util import xstr
 import logging
 from optparse import OptionParser
 from optparse import make_option
 import os
-import subprocess
 import sys
 
 version_file = os.path.join(sys.path[0], "VERSION")
@@ -125,8 +128,8 @@ where COMMAND and [OPTIONS] may be one of:
                                         or instances in CLUSTER
   launch-master CLUSTER               launch or find a master in CLUSTER
   launch-slaves CLUSTER NUM_SLAVES    launch NUM_SLAVES slaves in CLUSTER
-  launch-cluster CLUSTER NUM_SLAVES   launch a master and NUM_SLAVES slaves
-                                        in CLUSTER
+  launch-cluster CLUSTER (NUM_SLAVES| launch a master and NUM_SLAVES slaves or
+    N ROLE [N ROLE ...])                N instances in ROLE in CLUSTER
   create-formatted-snapshot CLUSTER   create an empty, formatted snapshot of
     SIZE                                size SIZE GiB
   list-storage CLUSTER                list storage volumes for CLUSTER
@@ -147,6 +150,9 @@ where COMMAND and [OPTIONS] may be one of:
 Use %(script)s COMMAND --help to see additional options for specific commands.
 """ % locals()
 
+def print_deprecation(script, replacement):
+  print "Deprecated. Use '%(script)s %(replacement)s'." % locals()
+
 def parse_options_and_config(command, option_list=[], extra_arguments=(),
                              unbounded_args=False):
   """
@@ -176,8 +182,9 @@ def parse_options_and_config(command, option_list=[], extra_arguments=(),
   cluster_name = args[0]
   opt = merge_config_with_options(cluster_name, config, options_dict)
   logging.debug("Options: %s", str(opt))
-  return (opt, args, get_cluster(get_cloud_provider(opt))(cluster_name,
-                                                          config_dir))
+  cluster = get_cluster(get_cloud_provider(opt))(cluster_name, config_dir)
+  service = get_service(cluster)
+  return (opt, args, service)
 
 def parse_options(command, option_list=[], expected_arguments=(),
                   unbounded_args=False):
@@ -221,6 +228,9 @@ def get_cloud_provider(options_dict):
     provider = DEFAULT_CLOUD_PROVIDER
   return provider
 
+def get_service(cluster):
+  return HadoopService(cluster)
+
 def check_options_set(options, option_names):
   for option_name in option_names:
     if options.get(option_name) is None:
@@ -242,10 +252,6 @@ def get_image_id(cluster, options):
   else:
     return options.get('image_id')
 
-def _prompt(prompt):
-  """ Returns true if user responds "yes" to prompt. """
-  return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
-
 def main():
   # Use HADOOP_CLOUD_LOGGING_LEVEL=DEBUG to enable debugging output.
   logging.basicConfig(level=getattr(logging,
@@ -261,194 +267,148 @@ def main():
   if command == 'list':
     (opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
     if len(args) == 0:
-      commands.list_all(get_cloud_provider(opt))
+      service = get_service(None)
+      service.list_all(get_cloud_provider(opt))
     else:
-      (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
-      commands.list_cluster(cluster)
+      (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
+      service.list()
 
   elif command == 'launch-master':
-    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS)
-    check_launch_options_set(cluster, opt)
+    (opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS)
+    check_launch_options_set(service.cluster, opt)
     config_dir = get_config_dir(opt)
-    commands.launch_master(cluster, config_dir, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
-    commands.attach_storage(cluster, (commands.MASTER,))
-    try:
-      commands.wait_for_hadoop(cluster, 0)
-    except TimeoutException:
-      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
-        " master."
-    commands.print_master_url(cluster)
+    template = InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env'))
+    service.launch_master(template, config_dir, opt.get('client_cidr'))
 
   elif command == 'launch-slaves':
-    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS,
                                                     ("NUM_SLAVES",))
     number_of_slaves = int(args[1])
-    check_launch_options_set(cluster, opt)
-    config_dir = get_config_dir(opt)
-    commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'))
-    commands.attach_storage(cluster, (commands.SLAVE,))
-    commands.print_master_url(cluster)
+    check_launch_options_set(service.cluster, opt)
+    template = InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env'))
+    service.launch_slaves(template)
 
   elif command == 'launch-cluster':
-    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS,
-                                                    ("NUM_SLAVES",))
-    number_of_slaves = int(args[1])
-    check_launch_options_set(cluster, opt)
+    (opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS,
+                                                    ("NUM_SLAVES",),
+                                                    unbounded_args=True)
+    check_launch_options_set(service.cluster, opt)
     config_dir = get_config_dir(opt)
-    commands.launch_master(cluster, config_dir, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
-    commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt),
-      opt.get('instance_type'),
-      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
-      opt.get('availability_zone'), opt.get('user_packages'),
-      opt.get('auto_shutdown'), opt.get('env'))
-    commands.attach_storage(cluster, commands.ROLES)
-    try:
-      commands.wait_for_hadoop(cluster, number_of_slaves)
-    except TimeoutException:
-      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
-        " cluster."
-    commands.print_master_url(cluster)
+    instance_templates = []
+    if len(args) == 2:
+      number_of_slaves = int(args[1])
+      print_deprecation(sys.argv[0], 'launch-cluster %s 1 nn,snn,jt %s dn,tt' %
+                        (service.cluster.name, number_of_slaves))
+      instance_templates = [
+        InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env')),
+        InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
+                         get_image_id(service.cluster, opt),
+                         opt.get('instance_type'), opt.get('key_name'),
+                         opt.get('public_key'), opt.get('user_data_file'),
+                         opt.get('availability_zone'), opt.get('user_packages'),
+                         opt.get('auto_shutdown'), opt.get('env')),
+                         ]
+    elif len(args) > 2 and len(args) % 2 == 0:
+      print_usage(sys.argv[0])
+      sys.exit(1)
+    else:
+      for i in range(len(args) / 2):
+        number = int(args[2 * i + 1])
+        roles = args[2 * i + 2].split(",")
+        instance_templates.append(
+          InstanceTemplate(roles, number, get_image_id(service.cluster, opt),
+                           opt.get('instance_type'), opt.get('key_name'),
+                           opt.get('public_key'), opt.get('user_data_file'),
+                           opt.get('availability_zone'),
+                           opt.get('user_packages'),
+                           opt.get('auto_shutdown'), opt.get('env')))
+
+    service.launch_cluster(instance_templates, config_dir,
+                           opt.get('client_cidr'))
 
   elif command == 'login':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    subprocess.call('ssh %s root@%s' % \
-                    (xstr(opt.get('ssh_options')), instances[0].public_ip),
-                    shell=True)
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
+    service.login(opt.get('ssh_options'))
 
   elif command == 'proxy':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    options = '-o "ConnectTimeout 10" -o "ServerAliveInterval 60" ' \
-              '-N -D 6666'
-    process = subprocess.Popen('ssh %s %s root@%s' %
-      (xstr(opt.get('ssh_options')), options, instances[0].public_ip),
-      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-      shell=True)
-    print """export HADOOP_CLOUD_PROXY_PID=%s;
-echo Proxy pid %s;""" % (process.pid, process.pid)
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
+    service.proxy(opt.get('ssh_options'))
 
   elif command == 'push':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS,
                                                     ("FILE",))
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    subprocess.call('scp %s -r %s root@%s:' % (xstr(opt.get('ssh_options')),
-                                               args[1], instances[0].public_ip),
-                                               shell=True)
+    service.proxy(opt.get('ssh_options'), args[1])
 
   elif command == 'exec':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS,
                                                     ("CMD",), True)
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    subprocess.call("ssh %s root@%s '%s'" % (xstr(opt.get('ssh_options')),
-                                             instances[0].public_ip,
-                                             " ".join(args[1:])), shell=True)
+    service.execute(opt.get('ssh_options'), args[1:])
 
   elif command == 'terminate-cluster':
-    (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS)
-    cluster.print_status(commands.ROLES)
-    if not opt["force"] and not _prompt("Terminate all instances?"):
-      print "Not terminating cluster."
-    else:
-      print "Terminating cluster"
-      cluster.terminate()
+    (opt, args, service) = parse_options_and_config(command, FORCE_OPTIONS)
+    service.terminate_cluster(opt["force"])
 
   elif command == 'delete-cluster':
-    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
-    cluster.delete()
+    (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
+    service.delete_cluster()
 
   elif command == 'create-formatted-snapshot':
-    (opt, args, cluster) = parse_options_and_config(command, SNAPSHOT_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, SNAPSHOT_OPTIONS,
                                                     ("SIZE",))
     size = int(args[1])
     check_options_set(opt, ['availability_zone', 'key_name'])
     ami_ubuntu_intrepid_x86 = 'ami-ec48af85' # use a general AMI
-    Ec2Storage.create_formatted_snapshot(cluster, size,
+    service.create_formatted_snapshot(size,
                                          opt.get('availability_zone'),
                                          ami_ubuntu_intrepid_x86,
                                          opt.get('key_name'),
                                          xstr(opt.get('ssh_options')))
 
   elif command == 'list-storage':
-    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
-    storage = cluster.get_storage()
-    storage.print_status(commands.ROLES)
+    (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
+    service.list_storage()
 
   elif command == 'create-storage':
-    (opt, args, cluster) = parse_options_and_config(command, PLACEMENT_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, PLACEMENT_OPTIONS,
                                                     ("ROLE", "NUM_INSTANCES",
                                                      "SPEC_FILE"))
-    storage = cluster.get_storage()
     role = args[1]
     number_of_instances = int(args[2])
     spec_file = args[3]
     check_options_set(opt, ['availability_zone'])
-    storage.create(role, number_of_instances, opt.get('availability_zone'),
-                   spec_file)
-    storage.print_status(commands.ROLES)
+    service.create_storage(role, number_of_instances,
+                           opt.get('availability_zone'), spec_file)
 
   elif command == 'attach-storage':
-    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS,
+    (opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS,
                                                     ("ROLE",))
-    storage = cluster.get_storage()
-    role = args[1]
-    storage.attach(role, cluster.get_instances_in_role(role, 'running'))
-    storage.print_status(commands.ROLES)
+    service.attach_storage(args[1])
 
   elif command == 'delete-storage':
-    (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS)
-    storage = cluster.get_storage()
-    storage.print_status(commands.ROLES)
-    if not opt["force"] and not _prompt("Delete all storage volumes? THIS WILL \
-      PERMANENTLY DELETE ALL DATA"):
-      print "Not deleting storage volumes."
-    else:
-      print "Deleting storage"
-      for role in commands.ROLES:
-        storage.delete(role)
+    (opt, args, service) = parse_options_and_config(command, FORCE_OPTIONS)
+    service.delete_storage(opt["force"])
 
   elif command == 'update-slaves-file':
-    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
+    (opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
     check_options_set(opt, ['private_key'])
     ssh_options = xstr(opt.get('ssh_options'))
-    instances = cluster.check_running(commands.MASTER, 1)
-    if not instances:
-      sys.exit(1)
-    master = instances[0]
-    slaves = cluster.get_instances_in_role(commands.SLAVE)
-    with open('slaves', 'w') as f:
-      for slave in slaves:
-        f.write(slave.public_ip + "\n")
-    subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % \
-                    (ssh_options, 'slaves', master.public_ip), shell=True)
-
-    # Copy private key
-    private_key = opt.get('private_key')
-    subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
-                    (ssh_options, private_key, master.public_ip), shell=True)
-    for slave in slaves:
-      subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
-                      (ssh_options, private_key, slave.public_ip), shell=True)
+    config_dir = get_config_dir(opt)
+    service.update_slaves_file(config_dir, ssh_options, opt.get('private_key'))
 
   else:
     print "Unrecognized command '%s'" % command

+ 3 - 3
src/contrib/cloud/src/py/hadoop/cloud/cluster.py

@@ -70,7 +70,7 @@ class Cluster(object):
     """
     raise Exception("Unimplemented")
 
-  def print_status(self, roles, state_filter="running"):
+  def print_status(self, roles=None, state_filter="running"):
     """
     Print the status of instances in the given roles, filtered by state.
     """
@@ -88,10 +88,10 @@ class Cluster(object):
     else:
       return instances
 
-  def launch_instances(self, role, number, image_id, size_id,
+  def launch_instances(self, roles, number, image_id, size_id,
                        instance_user_data, **kwargs):
     """
-    Launch instances (of the given role) in the cluster.
+    Launch instances (having the given roles) in the cluster.
     Returns a list of IDs for the instances started.
     """
     pass

+ 0 - 261
src/contrib/cloud/src/py/hadoop/cloud/commands.py

@@ -1,261 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""High-level commands that a user may want to run"""
-
-from __future__ import with_statement
-
-from hadoop.cloud.cluster import get_cluster
-from hadoop.cloud.cluster import InstanceUserData
-from hadoop.cloud.cluster import TimeoutException
-from hadoop.cloud.util import build_env_string
-from hadoop.cloud.util import url_get
-import logging
-import os
-import re
-import socket
-import sys
-import time
-
-logger = logging.getLogger(__name__)
-
-MASTER = "master"
-SLAVE = "slave"
-# ROLES contains the set of all roles in a cluster. It may be expanded in the
-# future to support HBase, or split namnode and jobtracker instances.
-ROLES = (MASTER, SLAVE)
-
-def _get_default_user_data_file_template(cluster):
-  return os.path.join(sys.path[0], 'hadoop-%s-init-remote.sh' %
-               cluster.get_provider_code())
-
-def list_all(provider):
-  """
-  Find and print clusters that have a running 'master' instance
-  """
-  clusters = get_cluster(provider).get_clusters_with_role(MASTER)
-  if not clusters:
-    print "No running clusters"
-  else:
-    for cluster in clusters:
-      print cluster
-
-def list_cluster(cluster):
-  cluster.print_status(ROLES)
-
-def launch_master(cluster, config_dir, image_id, size_id, key_name, public_key,
-                  user_data_file_template=None, placement=None,
-                  user_packages=None, auto_shutdown=None, env_strings=[],
-                  client_cidrs=[]):
-  if user_data_file_template == None:
-    user_data_file_template = _get_default_user_data_file_template(cluster)
-  if cluster.check_running(MASTER, 0) == False:
-    return  # don't proceed if another master is running
-  ebs_mappings = ''
-  storage = cluster.get_storage()
-  if storage.has_any_storage((MASTER,)):
-    ebs_mappings = storage.get_mappings_string_for_role(MASTER)
-  replacements = { "%ENV%": build_env_string(env_strings, {
-    "USER_PACKAGES": user_packages,
-    "AUTO_SHUTDOWN": auto_shutdown,
-    "EBS_MAPPINGS": ebs_mappings
-  }) }
-  instance_user_data = InstanceUserData(user_data_file_template, replacements)
-  instance_ids = cluster.launch_instances(MASTER, 1, image_id, size_id,
-                                          instance_user_data,
-                                          key_name=key_name,
-                                          public_key=public_key,
-                                          placement=placement)
-  print "Waiting for master to start (%s)" % str(instance_ids[0])
-  try:
-    cluster.wait_for_instances(instance_ids)
-    print "Master started"
-  except TimeoutException:
-    print "Timeout while waiting for master instance to start."
-    return
-  cluster.print_status((MASTER,))
-  master = cluster.check_running(MASTER, 1)[0]
-  _authorize_client_ports(cluster, master, client_cidrs)
-  _create_client_hadoop_site_file(cluster, config_dir, master)
-
-def _authorize_client_ports(cluster, master, client_cidrs):
-  if not client_cidrs:
-    logger.debug("No client CIDRs specified, using local address.")
-    client_ip = url_get('http://checkip.amazonaws.com/').strip()
-    client_cidrs = ("%s/32" % client_ip,)
-  logger.debug("Client CIDRs: %s", client_cidrs)
-  for client_cidr in client_cidrs:
-    # Allow access to port 80 on master from client
-    cluster.authorize_role(MASTER, 80, 80, client_cidr)
-    # Allow access to jobtracker UI on master from client
-    # (so we can see when the cluster is ready)
-    cluster.authorize_role(MASTER, 50030, 50030, client_cidr)
-    # Allow access to namenode and jobtracker via public address from master
-    # node
-  master_ip = socket.gethostbyname(master.public_ip)
-  cluster.authorize_role(MASTER, 8020, 8021, "%s/32" % master_ip)
-
-def _create_client_hadoop_site_file(cluster, config_dir, master):
-  cluster_dir = os.path.join(config_dir, cluster.name)
-  aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
-  aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
-  if not os.path.exists(cluster_dir):
-    os.makedirs(cluster_dir)
-  with open(os.path.join(cluster_dir, 'hadoop-site.xml'), 'w') as f:
-    f.write("""<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!-- Put site-specific property overrides in this file. -->
-<configuration>
-<property>
-  <name>hadoop.job.ugi</name>
-  <value>root,root</value>
-</property>
-<property>
-  <name>fs.default.name</name>
-  <value>hdfs://%(master)s:8020/</value>
-</property>
-<property>
-  <name>mapred.job.tracker</name>
-  <value>%(master)s:8021</value>
-</property>
-<property>
-  <name>hadoop.socks.server</name>
-  <value>localhost:6666</value>
-</property>
-<property>
-  <name>hadoop.rpc.socket.factory.class.default</name>
-  <value>org.apache.hadoop.net.SocksSocketFactory</value>
-</property>
-<property>
-  <name>fs.s3.awsAccessKeyId</name>
-  <value>%(aws_access_key_id)s</value>
-</property>
-<property>
-  <name>fs.s3.awsSecretAccessKey</name>
-  <value>%(aws_secret_access_key)s</value>
-</property>
-<property>
-  <name>fs.s3n.awsAccessKeyId</name>
-  <value>%(aws_access_key_id)s</value>
-</property>
-<property>
-  <name>fs.s3n.awsSecretAccessKey</name>
-  <value>%(aws_secret_access_key)s</value>
-</property>
-</configuration>
-""" % {'master': master.public_ip,
-  'aws_access_key_id': aws_access_key_id,
-  'aws_secret_access_key': aws_secret_access_key})
-
-def launch_slaves(cluster, number, image_id, size_id, key_name,
-                  public_key,
-                  user_data_file_template=None, placement=None,
-                  user_packages=None, auto_shutdown=None, env_strings=[]):
-  if user_data_file_template == None:
-    user_data_file_template = _get_default_user_data_file_template(cluster)
-  instances = cluster.check_running(MASTER, 1)
-  if not instances:
-    return
-  master = instances[0]
-  ebs_mappings = ''
-  storage = cluster.get_storage()
-  if storage.has_any_storage((SLAVE,)):
-    ebs_mappings = storage.get_mappings_string_for_role(SLAVE)
-  replacements = { "%ENV%": build_env_string(env_strings, {
-    "USER_PACKAGES": user_packages,
-    "AUTO_SHUTDOWN": auto_shutdown,
-    "EBS_MAPPINGS": ebs_mappings,
-    "MASTER_HOST": master.public_ip
-  }) }
-  instance_user_data = InstanceUserData(user_data_file_template, replacements)
-  instance_ids = cluster.launch_instances(SLAVE, number, image_id, size_id,
-                                          instance_user_data,
-                                          key_name=key_name,
-                                          public_key=public_key,
-                                          placement=placement)
-  print "Waiting for slaves to start"
-  try:
-    cluster.wait_for_instances(instance_ids)
-    print "Slaves started"
-  except TimeoutException:
-    print "Timeout while waiting for slave instances to start."
-    return
-  print
-  cluster.print_status((SLAVE,))
-
-def wait_for_hadoop(cluster, number, timeout=600):
-  start_time = time.time()
-  instances = cluster.check_running(MASTER, 1)
-  if not instances:
-    return
-  master = instances[0]
-  print "Waiting for jobtracker to start"
-  previous_running = 0
-  while True:
-    if (time.time() - start_time >= timeout):
-      raise TimeoutException()
-    try:
-      actual_running = _number_of_tasktrackers(master.public_ip, 1)
-      break
-    except IOError:
-      pass
-    sys.stdout.write(".")
-    sys.stdout.flush()
-    time.sleep(1)
-  print
-  if number > 0:
-    print "Waiting for %d tasktrackers to start" % number
-    while actual_running < number:
-      if (time.time() - start_time >= timeout):
-        raise TimeoutException()
-      try:
-        actual_running = _number_of_tasktrackers(master.public_ip, 5, 2)
-        if actual_running != previous_running:
-          sys.stdout.write("%d" % actual_running)
-        sys.stdout.write(".")
-        sys.stdout.flush()
-        time.sleep(1)
-        previous_running = actual_running
-      except IOError:
-        raise TimeoutException()
-    print
-
-# The optional ?type=active is a difference between Hadoop 0.18 and 0.20
-NUMBER_OF_TASK_TRACKERS = re.compile(
-  r'<a href="machines.jsp(?:\?type=active)?">(\d+)</a>')
-
-def _number_of_tasktrackers(jt_hostname, timeout, retries=0):
-  jt_page = url_get("http://%s:50030/jobtracker.jsp" % jt_hostname, timeout,
-                    retries)
-  m = NUMBER_OF_TASK_TRACKERS.search(jt_page)
-  if m:
-    return int(m.group(1))
-  return 0
-
-def print_master_url(cluster):
-  instances = cluster.check_running(MASTER, 1)
-  if not instances:
-    return
-  master = instances[0]
-  print "Browse the cluster at http://%s/" % master.public_ip
-
-def attach_storage(cluster, roles):
-  storage = cluster.get_storage()
-  if storage.has_any_storage(roles):
-    print "Waiting 10 seconds before attaching storage"
-    time.sleep(10)
-    for role in roles:
-      storage.attach(role, cluster.get_instances_in_role(role, 'running'))
-    storage.print_status(roles)

+ 49 - 31
src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py

@@ -85,7 +85,7 @@ class Ec2Cluster(Cluster):
     return self.name
 
   def _check_role_name(self, role):
-    if not re.match("^[a-zA-Z0-9_]+$", role):
+    if not re.match("^[a-zA-Z0-9_+]+$", role):
       raise RoleSyntaxException("Invalid role name '%s'" % role)
 
   def _group_name_for_role(self, role):
@@ -95,9 +95,11 @@ class Ec2Cluster(Cluster):
     self._check_role_name(role)
     return "%s-%s" % (self.name, role)
 
-  def _get_group_names(self, role):
-    self._check_role_name(role)
-    return [self._get_cluster_group_name(), self._group_name_for_role(role)]
+  def _get_group_names(self, roles):
+    group_names = [self._get_cluster_group_name()]
+    for role in roles:
+      group_names.append(self._group_name_for_role(role))
+    return group_names
 
   def _get_all_group_names(self):
     security_groups = self.ec2Connection.get_all_security_groups()
@@ -111,7 +113,7 @@ class Ec2Cluster(Cluster):
     if self.name not in all_group_names:
       return r
     for group in all_group_names:
-      if re.match("^%s(-[a-zA-Z0-9_]+)?$" % self.name, group):
+      if re.match("^%s(-[a-zA-Z0-9_+]+)?$" % self.name, group):
         r.append(group)
     return r
 
@@ -198,25 +200,31 @@ class Ec2Cluster(Cluster):
       instance.state, xstr(instance.key_name), instance.instance_type,
       str(instance.launch_time), instance.placement))
 
-  def print_status(self, roles, state_filter="running"):
+  def print_status(self, roles=None, state_filter="running"):
     """
     Print the status of instances in the given roles, filtered by state.
     """
-    for role in roles:
-      for instance in self._get_instances(self._group_name_for_role(role),
+    if not roles:
+      for instance in self._get_instances(self._get_cluster_group_name(),
                                           state_filter):
-        self._print_instance(role, instance)
-
-  def launch_instances(self, role, number, image_id, size_id,
+        self._print_instance("", instance)
+    else:
+      for role in roles:
+        for instance in self._get_instances(self._group_name_for_role(role),
+                                            state_filter):
+          self._print_instance(role, instance)
+
+  def launch_instances(self, roles, number, image_id, size_id,
                        instance_user_data, **kwargs):
-    self._check_role_name(role)
-
-    self._create_groups(role)
+    for role in roles:
+      self._check_role_name(role)  
+      self._create_groups(role)
+      
     user_data = instance_user_data.read_as_gzip_stream()
 
     reservation = self.ec2Connection.run_instances(image_id, min_count=number,
       max_count=number, key_name=kwargs.get('key_name', None),
-      security_groups=self._get_group_names(role), user_data=user_data,
+      security_groups=self._get_group_names(roles), user_data=user_data,
       instance_type=size_id, placement=kwargs.get('placement', None))
     return [instance.id for instance in reservation.instances]
 
@@ -372,6 +380,11 @@ class Ec2Storage(Storage):
         return True
     return False
 
+  def get_roles(self):
+    storage_filename = self._get_storage_filename()
+    volume_manager = JsonVolumeManager(storage_filename)
+    return volume_manager.get_roles()
+  
   def _get_ec2_volumes_dict(self, mountable_volumes):
     volume_ids = [mv.volume_id for mv in sum(mountable_volumes, [])]
     volumes = self.cluster.ec2Connection.get_all_volumes(volume_ids)
@@ -386,7 +399,11 @@ class Ec2Storage(Storage):
                      volume.status, str(volume.create_time),
                      str(volume.attach_time)))
 
-  def print_status(self, roles):
+  def print_status(self, roles=None):
+    if roles == None:
+      storage_filename = self._get_storage_filename()
+      volume_manager = JsonVolumeManager(storage_filename)
+      roles = volume_manager.get_roles()
     for role in roles:
       mountable_volumes_list = self._get_mountable_volumes(role)
       ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
@@ -441,20 +458,21 @@ class Ec2Storage(Storage):
         print "Attaching %s to %s" % (volume.id, instance.id)
         volume.attach(instance.id, mountable_volume.device)
 
-  def delete(self, role):
+  def delete(self, roles=[]):
     storage_filename = self._get_storage_filename()
     volume_manager = JsonVolumeManager(storage_filename)
-    mountable_volumes_list = volume_manager.get_instance_storage_for_role(role)
-    ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
-    all_available = True
-    for volume in ec2_volumes.itervalues():
-      if volume.status != 'available':
-        all_available = False
-        logger.warning("Volume %s is not available.", volume)
-    if not all_available:
-      logger.warning("Some volumes are still in use for role %s.\
-        Aborting delete.", role)
-      return
-    for volume in ec2_volumes.itervalues():
-      volume.delete()
-    volume_manager.remove_instance_storage_for_role(role)
+    for role in roles:
+      mountable_volumes_list = volume_manager.get_instance_storage_for_role(role)
+      ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+      all_available = True
+      for volume in ec2_volumes.itervalues():
+        if volume.status != 'available':
+          all_available = False
+          logger.warning("Volume %s is not available.", volume)
+      if not all_available:
+        logger.warning("Some volumes are still in use for role %s.\
+          Aborting delete.", role)
+        return
+      for volume in ec2_volumes.itervalues():
+        volume.delete()
+      volume_manager.remove_instance_storage_for_role(role)

+ 462 - 0
src/contrib/cloud/src/py/hadoop/cloud/service.py

@@ -0,0 +1,462 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Classes for running services on a cluster.
+"""
+
+from __future__ import with_statement
+
+from hadoop.cloud.cluster import get_cluster
+from hadoop.cloud.cluster import InstanceUserData
+from hadoop.cloud.cluster import TimeoutException
+from hadoop.cloud.providers.ec2 import Ec2Storage
+from hadoop.cloud.util import build_env_string
+from hadoop.cloud.util import url_get
+from hadoop.cloud.util import xstr
+import logging
+import os
+import re
+import socket
+import subprocess
+import sys
+import time
+
+logger = logging.getLogger(__name__)
+
+MASTER = "master"  # Deprecated.
+
+NAMENODE = "nn"
+SECONDARY_NAMENODE = "snn"
+JOBTRACKER = "jt"
+DATANODE = "dn"
+TASKTRACKER = "tt"
+
+class InstanceTemplate(object):
+  """
+  A template for creating server instances in a cluster.
+  """
+  def __init__(self, roles, number, image_id, size_id,
+                     key_name, public_key,
+                     user_data_file_template=None, placement=None,
+                     user_packages=None, auto_shutdown=None, env_strings=[]):
+    self.roles = roles
+    self.number = number
+    self.image_id = image_id
+    self.size_id = size_id
+    self.key_name = key_name
+    self.public_key = public_key
+    self.user_data_file_template = user_data_file_template
+    self.placement = placement
+    self.user_packages = user_packages
+    self.auto_shutdown = auto_shutdown
+    self.env_strings = env_strings
+
+  def add_env_strings(self, env_strings):
+    new_env_strings = list(self.env_strings or [])
+    new_env_strings.extend(env_strings)
+    self.env_strings = new_env_strings
+
+class HadoopService(object):
+  """
+  A HDFS and MapReduce service.
+  """
+  
+  def __init__(self, cluster):
+    self.cluster = cluster
+    
+  def list_all(self, provider):
+    """
+    Find and print clusters that have a running namenode instances
+    """
+    legacy_clusters = get_cluster(provider).get_clusters_with_role(MASTER)
+    clusters = get_cluster(provider).get_clusters_with_role(NAMENODE)
+    clusters.extend(legacy_clusters)
+    if not clusters:
+      print "No running clusters"
+    else:
+      for cluster in clusters:
+        print cluster
+    
+  def list(self):
+    self.cluster.print_status()
+
+  def launch_master(self, instance_template, config_dir, client_cidr):
+    if self.cluster.check_running(NAMENODE, 0) == False:
+      return  # don't proceed if another master is running
+    self.launch_cluster((instance_template,), config_dir, client_cidr)
+  
+  def launch_slaves(self, instance_template):
+    instances = self.cluster.check_running(NAMENODE, 1)
+    if not instances:
+      return
+    master = instances[0]
+    for role in (NAMENODE, SECONDARY_NAMENODE, JOBTRACKER): 
+      singleton_host_env = "%s_HOST=%s" % \
+              (self._sanitize_role_name(role), master.public_ip)
+      instance_template.add_env_strings((singleton_host_env))
+    self._launch_instances(instance_template)              
+    self._attach_storage(instance_template.roles)
+    self._print_master_url()
+      
+  def launch_cluster(self, instance_templates, config_dir, client_cidr):
+    number_of_tasktrackers = 0
+    roles = []
+    for it in instance_templates:
+      roles.extend(it.roles)
+      if TASKTRACKER in it.roles:
+        number_of_tasktrackers += it.number
+    self._launch_cluster_instances(instance_templates)
+    self._create_client_hadoop_site_file(config_dir)
+    self._authorize_client_ports(client_cidr)
+    self._attach_storage(roles)
+    try:
+      self._wait_for_hadoop(number_of_tasktrackers)
+    except TimeoutException:
+      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
+        " cluster."
+    self._print_master_url()
+    
+  def login(self, ssh_options):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call('ssh %s root@%s' % \
+                    (xstr(ssh_options), master.public_ip),
+                    shell=True)
+    
+  def proxy(self, ssh_options):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    options = '-o "ConnectTimeout 10" -o "ServerAliveInterval 60" ' \
+              '-N -D 6666'
+    process = subprocess.Popen('ssh %s %s root@%s' %
+      (xstr(ssh_options), options, master.public_ip),
+      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+      shell=True)
+    print """export HADOOP_CLOUD_PROXY_PID=%s;
+echo Proxy pid %s;""" % (process.pid, process.pid)
+    
+  def push(self, ssh_options, file):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call('scp %s -r %s root@%s:' % (xstr(ssh_options),
+                                               file, master.public_ip),
+                                               shell=True)
+    
+  def push(self, ssh_options, file):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call('scp %s -r %s root@%s:' % (xstr(ssh_options),
+                                               file, master.public_ip),
+                                               shell=True)
+    
+  def execute(self, ssh_options, args):
+    master = self._get_master()
+    if not master:
+      sys.exit(1)
+    subprocess.call("ssh %s root@%s '%s'" % (xstr(ssh_options),
+                                             master.public_ip,
+                                             " ".join(args)), shell=True)
+  
+  def terminate_cluster(self,  force=False):
+    self.cluster.print_status()
+    if not force and not self._prompt("Terminate all instances?"):
+      print "Not terminating cluster."
+    else:
+      print "Terminating cluster"
+      self.cluster.terminate()
+      
+  def delete_cluster(self):
+    self.cluster.delete()
+    
+  def create_formatted_snapshot(self, size, availability_zone,
+                                image_id, key_name, ssh_options):
+    Ec2Storage.create_formatted_snapshot(self.cluster, size,
+                                         availability_zone,
+                                         image_id,
+                                         key_name,
+                                         ssh_options)
+
+  def list_storage(self):
+    storage = self.cluster.get_storage()
+    storage.print_status()
+
+  def create_storage(self, role, number_of_instances,
+                     availability_zone, spec_file):
+    storage = self.cluster.get_storage()
+    storage.create(role, number_of_instances, availability_zone, spec_file)
+    storage.print_status()
+    
+  def attach_storage(self, role):
+    storage = self.cluster.get_storage()
+    storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
+    storage.print_status()
+    
+  def delete_storage(self, force=False):
+    storage = self.cluster.get_storage()
+    storage.print_status()
+    if not force and not self._prompt("Delete all storage volumes? THIS WILL \
+      PERMANENTLY DELETE ALL DATA"):
+      print "Not deleting storage volumes."
+    else:
+      print "Deleting storage"
+      for role in storage.get_roles():
+        storage.delete(role)
+        
+  def update_slaves_file(self, config_dir, ssh_options, private_key):
+    instances = self.cluster.check_running(NAMENODE, 1)
+    if not instances:
+      sys.exit(1)
+    master = instances[0]
+    slaves = self.cluster.get_instances_in_role(DATANODE, "running")
+    cluster_dir = os.path.join(config_dir, self.cluster.name)
+    slaves_file = os.path.join(cluster_dir, 'slaves')
+    with open(slaves_file, 'w') as f:
+      for slave in slaves:
+        f.write(slave.public_ip + "\n")
+    subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % \
+                    (ssh_options, slaves_file, master.public_ip), shell=True)
+    # Copy private key
+    subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
+                    (ssh_options, private_key, master.public_ip), shell=True)
+    for slave in slaves:
+      subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
+                      (ssh_options, private_key, slave.public_ip), shell=True)
+        
+  def _prompt(self, prompt):
+    """ Returns true if user responds "yes" to prompt. """
+    return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
+  
+
+  def _get_default_user_data_file_template(self):
+    return os.path.join(sys.path[0], 'hadoop-%s-init-remote.sh' %
+                 self.cluster.get_provider_code())
+  
+  def _get_master(self):
+    # For split namenode/jobtracker, designate the namenode as the master
+    return self._get_namenode()
+
+  def _get_namenode(self):
+    instances = self.cluster.get_instances_in_role(NAMENODE, "running")
+    if not instances:
+      return None
+    return instances[0]
+
+  def _get_jobtracker(self):
+    instances = self.cluster.get_instances_in_role(JOBTRACKER, "running")
+    if not instances:
+      return None
+    return instances[0]
+
+  def _launch_cluster_instances(self, instance_templates):
+    singleton_hosts = []
+    for instance_template in instance_templates:
+      instance_template.add_env_strings(singleton_hosts)
+      instances = self._launch_instances(instance_template)
+      if instance_template.number == 1:
+        if len(instances) != 1:
+          logger.error("Expected a single '%s' instance, but found %s.",
+                       "".join(instance_template.roles), len(instances))
+          return
+        else:
+          for role in instance_template.roles:
+            singleton_host_env = "%s_HOST=%s" % \
+              (self._sanitize_role_name(role),
+               instances[0].public_ip)
+            singleton_hosts.append(singleton_host_env)
+
+  def _sanitize_role_name(self, role):
+    """Replace characters in role name with ones allowed in bash variable names"""
+    return role.replace('+', '_').upper()
+
+  def _launch_instances(self, instance_template):
+    it = instance_template
+    user_data_file_template = it.user_data_file_template
+    if it.user_data_file_template == None:
+      user_data_file_template = self._get_default_user_data_file_template()
+    ebs_mappings = ''
+    storage = self.cluster.get_storage()
+    for role in it.roles:
+      if storage.has_any_storage((role,)):
+        ebs_mappings = storage.get_mappings_string_for_role(role)
+    replacements = { "%ENV%": build_env_string(it.env_strings, {
+      "ROLES": ",".join(it.roles),
+      "USER_PACKAGES": it.user_packages,
+      "AUTO_SHUTDOWN": it.auto_shutdown,
+      "EBS_MAPPINGS": ebs_mappings,
+    }) }
+    instance_user_data = InstanceUserData(user_data_file_template, replacements)
+    instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
+                                            it.size_id,
+                                            instance_user_data,
+                                            key_name=it.key_name,
+                                            public_key=it.public_key,
+                                            placement=it.placement)
+    print "Waiting for %s instances in role %s to start" % \
+      (it.number, ",".join(it.roles))
+    try:
+      self.cluster.wait_for_instances(instance_ids)
+      print "%s instances started" % ",".join(it.roles)
+    except TimeoutException:
+      print "Timeout while waiting for %s instance to start." % ",".join(it.roles)
+      return
+    print
+    self.cluster.print_status(it.roles[0])
+    return self.cluster.get_instances_in_role(it.roles[0], "running")
+
+  def _authorize_client_ports(self, client_cidrs=[]):
+    if not client_cidrs:
+      logger.debug("No client CIDRs specified, using local address.")
+      client_ip = url_get('http://checkip.amazonaws.com/').strip()
+      client_cidrs = ("%s/32" % client_ip,)
+    logger.debug("Client CIDRs: %s", client_cidrs)
+    namenode = self._get_namenode()
+    jobtracker = self._get_jobtracker()
+    for client_cidr in client_cidrs:
+      # Allow access to port 80 on namenode from client
+      self.cluster.authorize_role(NAMENODE, 80, 80, client_cidr)
+      # Allow access to jobtracker UI on master from client
+      # (so we can see when the cluster is ready)
+      self.cluster.authorize_role(JOBTRACKER, 50030, 50030, client_cidr)
+    # Allow access to namenode and jobtracker via public address from each other
+    namenode_ip = socket.gethostbyname(namenode.public_ip)
+    jobtracker_ip = socket.gethostbyname(jobtracker.public_ip)
+    self.cluster.authorize_role(NAMENODE, 8020, 8020, "%s/32" % namenode_ip)
+    self.cluster.authorize_role(NAMENODE, 8020, 8020, "%s/32" % jobtracker_ip)
+    self.cluster.authorize_role(JOBTRACKER, 8021, 8021, "%s/32" % namenode_ip)
+    self.cluster.authorize_role(JOBTRACKER, 8021, 8021,
+                                "%s/32" % jobtracker_ip)
+  
+  def _create_client_hadoop_site_file(self, config_dir):
+    namenode = self._get_namenode()
+    jobtracker = self._get_jobtracker()
+    cluster_dir = os.path.join(config_dir, self.cluster.name)
+    aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
+    aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
+    if not os.path.exists(cluster_dir):
+      os.makedirs(cluster_dir)
+    with open(os.path.join(cluster_dir, 'hadoop-site.xml'), 'w') as f:
+      f.write("""<?xml version="1.0"?>
+  <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+  <!-- Put site-specific property overrides in this file. -->
+  <configuration>
+  <property>
+    <name>hadoop.job.ugi</name>
+    <value>root,root</value>
+  </property>
+  <property>
+    <name>fs.default.name</name>
+    <value>hdfs://%(namenode)s:8020/</value>
+  </property>
+  <property>
+    <name>mapred.job.tracker</name>
+    <value>%(jobtracker)s:8021</value>
+  </property>
+  <property>
+    <name>hadoop.socks.server</name>
+    <value>localhost:6666</value>
+  </property>
+  <property>
+    <name>hadoop.rpc.socket.factory.class.default</name>
+    <value>org.apache.hadoop.net.SocksSocketFactory</value>
+  </property>
+  <property>
+    <name>fs.s3.awsAccessKeyId</name>
+    <value>%(aws_access_key_id)s</value>
+  </property>
+  <property>
+    <name>fs.s3.awsSecretAccessKey</name>
+    <value>%(aws_secret_access_key)s</value>
+  </property>
+  <property>
+    <name>fs.s3n.awsAccessKeyId</name>
+    <value>%(aws_access_key_id)s</value>
+  </property>
+  <property>
+    <name>fs.s3n.awsSecretAccessKey</name>
+    <value>%(aws_secret_access_key)s</value>
+  </property>
+  </configuration>
+  """ % {'namenode': namenode.public_ip,
+    'jobtracker': jobtracker.public_ip,
+    'aws_access_key_id': aws_access_key_id,
+    'aws_secret_access_key': aws_secret_access_key})        
+
+  def _wait_for_hadoop(self, number, timeout=600):
+    start_time = time.time()
+    jobtracker = self._get_jobtracker()
+    if not jobtracker:
+      return
+    print "Waiting for jobtracker to start"
+    previous_running = 0
+    while True:
+      if (time.time() - start_time >= timeout):
+        raise TimeoutException()
+      try:
+        actual_running = self._number_of_tasktrackers(jobtracker.public_ip, 1)
+        break
+      except IOError:
+        pass
+      sys.stdout.write(".")
+      sys.stdout.flush()
+      time.sleep(1)
+    print
+    if number > 0:
+      print "Waiting for %d tasktrackers to start" % number
+      while actual_running < number:
+        if (time.time() - start_time >= timeout):
+          raise TimeoutException()
+        try:
+          actual_running = self._number_of_tasktrackers(jobtracker.public_ip, 5, 2)
+          if actual_running != previous_running:
+            sys.stdout.write("%d" % actual_running)
+          sys.stdout.write(".")
+          sys.stdout.flush()
+          time.sleep(1)
+          previous_running = actual_running
+        except IOError:
+          pass
+      print
+
+  # The optional ?type=active is a difference between Hadoop 0.18 and 0.20
+  _NUMBER_OF_TASK_TRACKERS = re.compile(
+    r'<a href="machines.jsp(?:\?type=active)?">(\d+)</a>')
+  
+  def _number_of_tasktrackers(self, jt_hostname, timeout, retries=0):
+    jt_page = url_get("http://%s:50030/jobtracker.jsp" % jt_hostname, timeout,
+                      retries)
+    m = self._NUMBER_OF_TASK_TRACKERS.search(jt_page)
+    if m:
+      return int(m.group(1))
+    return 0
+
+  def _print_master_url(self):
+    webserver = self._get_jobtracker()
+    if not webserver:
+      return
+    print "Browse the cluster at http://%s/" % webserver.public_ip
+
+  def _attach_storage(self, roles):
+    storage = self.cluster.get_storage()
+    if storage.has_any_storage(roles):
+      print "Waiting 10 seconds before attaching storage"
+      time.sleep(10)
+      for role in roles:
+        storage.attach(role, self.cluster.get_instances_in_role(role, 'running'))
+      storage.print_status(roles)

+ 13 - 3
src/contrib/cloud/src/py/hadoop/cloud/storage.py

@@ -81,6 +81,10 @@ class JsonVolumeManager(object):
 
   def _store(self, obj):
     return json.dump(obj, open(self.filename, "w"), sort_keys=True, indent=2)
+  
+  def get_roles(self):
+    json_dict = self._load()
+    return json_dict.keys()
 
   def add_instance_storage_for_role(self, role, mountable_volumes):
     json_dict = self._load()
@@ -142,7 +146,13 @@ class Storage(object):
     """
     return False
 
-  def print_status(self, roles):
+  def get_roles(self):
+    """
+    Return a list of roles that have storage defined.
+    """
+    return []
+
+  def print_status(self, roles=None):
     """
     Print the status of storage volumes for the given roles.
     """
@@ -156,8 +166,8 @@ class Storage(object):
     """
     pass
 
-  def delete(self, role):
+  def delete(self, roles=[]):
     """
-    Permanently delete all the storage for a role.
+    Permanently delete all the storage for the given roles.
     """
     pass

+ 1 - 1
src/contrib/cloud/src/test/hadoop/cloud/testcluster.py

@@ -27,7 +27,7 @@ class TestCluster(unittest.TestCase):
   def test_check_role_name_valid(self):
     cluster = Ec2Cluster("test-cluster", None)
     cluster._check_role_name(
-      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_")
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_+")
 
   def test_check_role_name_dash_is_invalid(self):
     cluster = Ec2Cluster("test-cluster", None)

+ 6 - 0
src/contrib/cloud/src/test/hadoop/cloud/teststorage.py

@@ -85,6 +85,7 @@ class TestJsonVolumeManager(unittest.TestCase):
     volume_manager = JsonVolumeManager("volumemanagertest.json")
     self.assertEqual(0,
       len(volume_manager.get_instance_storage_for_role("master")))
+    self.assertEqual(0, len(volume_manager.get_roles))
 
     volume_manager.add_instance_storage_for_role("master",
                                                  [MountableVolume("vol_1", "/",
@@ -131,6 +132,11 @@ class TestJsonVolumeManager(unittest.TestCase):
     self.assertEqual("vol_4", slave_storage_instance1_vol1.volume_id)
     self.assertEqual("/data1", slave_storage_instance1_vol1.mount_point)
     self.assertEqual("/dev/sdk", slave_storage_instance1_vol1.device)
+    
+    roles = volume_manager.get_roles
+    self.assertEqual(2, len(roles))
+    self.assertTrue("slave" in roles)
+    self.assertTrue("master" in roles)
 
 
 if __name__ == '__main__':