Forráskód Böngészése

HADOOP-6108. Add support for EBS storage on EC2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@885888 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 15 éve
szülő
commit
ff6c0ef547
27 módosított fájl, 3219 hozzáadás és 0 törlés
  1. 2 0
      CHANGES.txt
  2. 1 0
      build.xml
  3. 307 0
      src/contrib/cloud/README.txt
  4. 52 0
      src/contrib/cloud/src/integration-test/create-ebs-snapshot.sh
  5. 30 0
      src/contrib/cloud/src/integration-test/ebs-storage-spec.json
  6. 122 0
      src/contrib/cloud/src/integration-test/persistent-cluster.sh
  7. 77 0
      src/contrib/cloud/src/integration-test/transient-cluster.sh
  8. 1 0
      src/contrib/cloud/src/py/VERSION
  9. 21 0
      src/contrib/cloud/src/py/hadoop-ec2
  10. 530 0
      src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh
  11. 14 0
      src/contrib/cloud/src/py/hadoop/__init__.py
  12. 14 0
      src/contrib/cloud/src/py/hadoop/cloud/__init__.py
  13. 456 0
      src/contrib/cloud/src/py/hadoop/cloud/cli.py
  14. 186 0
      src/contrib/cloud/src/py/hadoop/cloud/cluster.py
  15. 261 0
      src/contrib/cloud/src/py/hadoop/cloud/commands.py
  16. 14 0
      src/contrib/cloud/src/py/hadoop/cloud/providers/__init__.py
  17. 61 0
      src/contrib/cloud/src/py/hadoop/cloud/providers/dummy.py
  18. 460 0
      src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py
  19. 163 0
      src/contrib/cloud/src/py/hadoop/cloud/storage.py
  20. 84 0
      src/contrib/cloud/src/py/hadoop/cloud/util.py
  21. 14 0
      src/contrib/cloud/src/test/hadoop/__init__.py
  22. 14 0
      src/contrib/cloud/src/test/hadoop/cloud/__init__.py
  23. 36 0
      src/contrib/cloud/src/test/hadoop/cloud/alltests.py
  24. 37 0
      src/contrib/cloud/src/test/hadoop/cloud/testcluster.py
  25. 137 0
      src/contrib/cloud/src/test/hadoop/cloud/teststorage.py
  26. 44 0
      src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py
  27. 81 0
      src/contrib/cloud/src/test/hadoop/cloud/testutil.py

+ 2 - 0
CHANGES.txt

@@ -14,6 +14,8 @@ Trunk (unreleased changes)
     and the init of the class is made to take a Configuration argument.
     (Jakob Homan via ddas)
 
+    HADOOP-6108. Add support for EBS storage on EC2. (tomwhite)
+
   IMPROVEMENTS
 
     HADOOP-6283. Improve the exception messages thrown by

+ 1 - 0
build.xml

@@ -1200,6 +1200,7 @@
         <exclude name="**/native/*"/>
         <exclude name="**/native/config/*"/>
         <exclude name="**/VERSION"/>
+        <exclude name="**/*.json"/>
         <exclude name="**/hod/*.txt"/>
       </fileset>
     </rat:report>

+ 307 - 0
src/contrib/cloud/README.txt

@@ -0,0 +1,307 @@
+Hadoop Cloud Scripts
+====================
+
+These scripts allow you to run Hadoop on cloud providers. Currently only Amazon
+EC2 is supported, but more providers are expected to be added over time.
+
+Getting Started
+===============
+
+First, unpack the scripts on your system. For convenience, you may like to put
+the top-level directory on your path.
+
+You'll also need python (version 2.5 or newer) and the boto and simplejson
+libraries. After you download boto and simplejson, you can install each in turn
+by running the following in the directory where you unpacked the distribution:
+
+% sudo python setup.py install
+
+Alternatively, you might like to use the python-boto and python-simplejson RPM
+and Debian packages.
+
+You need to tell the scripts your AWS credentials. The simplest way to do this
+is to set the environment variables (but see
+http://code.google.com/p/boto/wiki/BotoConfig for other options):
+
+    * AWS_ACCESS_KEY_ID - Your AWS Access Key ID
+    * AWS_SECRET_ACCESS_KEY - Your AWS Secret Access Key
+
+To configure the scripts, create a directory called .hadoop-cloud (note the
+leading ".") in your home directory. In it, create a file called
+clusters.cfg with a section for each cluster you want to control. e.g.:
+
+[my-hadoop-cluster]
+image_id=ami-6159bf08
+instance_type=c1.medium
+key_name=tom
+availability_zone=us-east-1c
+private_key=PATH_TO_PRIVATE_KEY
+ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
+
+The image chosen here is one with a i386 Fedora OS. For a list of suitable AMIs
+see http://wiki.apache.org/hadoop/AmazonEC2.
+
+The architecture must be compatible with the instance type. For m1.small and
+c1.medium instances use the i386 AMIs, while for m1.large, m1.xlarge, and
+c1.xlarge instances use the x86_64 AMIs. One of the high CPU instances
+(c1.medium or c1.xlarge) is recommended.
+
+Then you can run the hadoop-ec2 script. It will display usage instructions when
+invoked without arguments.
+
+You can test that it can connect to AWS by typing:
+
+% hadoop-ec2 list
+
+LAUNCHING A CLUSTER
+===================
+
+To launch a cluster called "my-hadoop-cluster" with 10 worker (slave) nodes
+type:
+
+% hadoop-ec2 launch-cluster my-hadoop-cluster 10
+
+This will boot the master node and 10 worker nodes. When the nodes have started
+and the Hadoop cluster has come up, the console will display a message like
+
+  Browse the cluster at http://ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com/
+
+You can access Hadoop's web UI by visiting this URL. By default, port 80 is
+opened for access from your client machine. You may change the firewall settings
+(to allow access from a network, rather than just a single machine, for example)
+by using the Amazon EC2 command line tools, or by using a tool like Elastic Fox.
+The security group to change is the one named <cluster-name>-master.
+
+For security reasons, traffic from the network your client is running on is
+proxied through the master node of the cluster using an SSH tunnel (a SOCKS
+proxy on port 6666). To set up the proxy run the following command:
+
+% hadoop-ec2 proxy my-hadoop-cluster
+
+Web browsers need to be configured to use this proxy too, so you can view pages
+served by worker nodes in the cluster. The most convenient way to do this is to
+use a proxy auto-config (PAC) file, such as this one:
+
+  http://apache-hadoop-ec2.s3.amazonaws.com/proxy.pac
+
+If you are using Firefox, then you may find
+FoxyProxy useful for managing PAC files. (If you use FoxyProxy, then you need to
+get it to use the proxy for DNS lookups. To do this, go to Tools -> FoxyProxy ->
+Options, and then under "Miscellaneous" in the bottom left, choose "Use SOCKS
+proxy for DNS lookups".)
+
+PERSISTENT CLUSTERS
+===================
+
+Hadoop clusters running on EC2 that use local EC2 storage (the default) will not
+retain data once the cluster has been terminated. It is possible to use EBS for
+persistent data, which allows a cluster to be shut down while it is not being
+used.
+
+Note: EBS support is a Beta feature.
+
+First create a new section called "my-ebs-cluster" in the
+.hadoop-cloud/clusters.cfg file.
+
+Now we need to create storage for the new cluster. Create a temporary EBS volume
+of size 100GiB, format it, and save it as a snapshot in S3. This way, we only
+have to do the formatting once.
+
+% hadoop-ec2 create-formatted-snapshot my-ebs-cluster 100
+
+We create storage for a single master and for two slaves. The volumes to create
+are described in a JSON spec file, which references the snapshot we just
+created. Here is the contents of a JSON file, called
+my-ebs-cluster-storage-spec.json:
+
+{
+  "master": [
+    {
+      "device": "/dev/sdj",
+      "mount_point": "/ebs1",
+      "size_gb": "100",
+      "snapshot_id": "snap-268e704f"
+    },
+    {
+      "device": "/dev/sdk",
+      "mount_point": "/ebs2",
+      "size_gb": "100",
+      "snapshot_id": "snap-268e704f"
+    }
+  ],
+  "slave": [
+    {
+      "device": "/dev/sdj",
+      "mount_point": "/ebs1",
+      "size_gb": "100",
+      "snapshot_id": "snap-268e704f"
+    },
+    {
+      "device": "/dev/sdk",
+      "mount_point": "/ebs2",
+      "size_gb": "100",
+      "snapshot_id": "snap-268e704f"
+    }
+  ]
+}
+
+
+Each role (here "master" and "slave") is the key to an array of volume
+specifications. In this example, the "slave" role has two devices ("/dev/sdj"
+and "/dev/sdk") with different mount points, sizes, and generated from an EBS
+snapshot. The snapshot is the formatted snapshot created earlier, so that the
+volumes we create are pre-formatted. The size of the drives must match the size
+of the snapshot created earlier.
+
+Let's create actual volumes using this file.
+
+% hadoop-ec2 create-storage my-ebs-cluster master 1 \
+    my-ebs-cluster-storage-spec.json
+% hadoop-ec2 create-storage my-ebs-cluster slave 2 \
+    my-ebs-cluster-storage-spec.json
+
+Now let's start the cluster with 2 slave nodes:
+
+% hadoop-ec2 launch-cluster my-ebs-cluster 2
+
+Login and run a job which creates some output.
+
+% hadoop-ec2 login my-ebs-cluster
+
+# hadoop fs -mkdir input
+# hadoop fs -put /etc/hadoop/conf/*.xml input
+# hadoop jar /usr/lib/hadoop/hadoop-*-examples.jar grep input output \
+    'dfs[a-z.]+'
+
+Look at the output:
+
+# hadoop fs -cat output/part-00000 | head
+
+Now let's shutdown the cluster.
+
+% hadoop-ec2 terminate-cluster my-ebs-cluster
+
+A little while later we restart the cluster and login.
+
+% hadoop-ec2 launch-cluster my-ebs-cluster 2
+% hadoop-ec2 login my-ebs-cluster
+
+The output from the job we ran before should still be there:
+
+# hadoop fs -cat output/part-00000 | head
+
+RUNNING JOBS
+============
+
+When you launched the cluster, a hadoop-site.xml file was created in the
+directory ~/.hadoop-cloud/<cluster-name>. You can use this to connect to the
+cluster by setting the HADOOP_CONF_DIR enviroment variable (it is also possible
+to set the configuration file to use by passing it as a -conf option to Hadoop
+Tools):
+
+% export HADOOP_CONF_DIR=~/.hadoop-cloud/my-hadoop-cluster
+
+Let's try browsing HDFS:
+
+% hadoop fs -ls /
+
+Running a job is straightforward:
+
+% hadoop fs -mkdir input # create an input directory
+% hadoop fs -put $HADOOP_HOME/LICENSE.txt input # copy a file there
+% hadoop jar $HADOOP_HOME/hadoop-*-examples.jar wordcount input output
+% hadoop fs -cat output/part-00000 | head
+
+Of course, these examples assume that you have installed Hadoop on your local
+machine. It is also possible to launch jobs from within the cluster. First log
+into the master node:
+
+% hadoop-ec2 login my-hadoop-cluster
+
+Then run a job as before:
+
+# hadoop fs -mkdir input
+# hadoop fs -put /etc/hadoop/conf/*.xml input
+# hadoop jar /usr/lib/hadoop/hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
+# hadoop fs -cat output/part-00000 | head
+
+TERMINATING A CLUSTER
+=====================
+
+When you've finished with your cluster you can stop it with the following
+command.
+
+NOTE: ALL DATA WILL BE LOST UNLESS YOU ARE USING EBS!
+
+% hadoop-ec2 terminate-cluster my-hadoop-cluster
+
+You can then delete the EC2 security groups with:
+
+% hadoop-ec2 delete-cluster my-hadoop-cluster
+
+AUTOMATIC CLUSTER SHUTDOWN
+==========================
+
+You may use the --auto-shutdown option to automatically terminate a cluster
+a given time (specified in minutes) after launch. This is useful for short-lived
+clusters where the jobs complete in a known amount of time.
+
+If you want to cancel the automatic shutdown, then run
+
+% hadoop-ec2 exec my-hadoop-cluster shutdown -c
+% hadoop-ec2 update-slaves-file my-hadoop-cluster
+% hadoop-ec2 exec my-hadoop-cluster /usr/lib/hadoop/bin/slaves.sh shutdown -c
+
+CONFIGURATION NOTES
+===================
+
+It is possible to specify options on the command line: these take precedence
+over any specified in the configuration file. For example:
+
+% hadoop-ec2 launch-cluster --image-id ami-2359bf4a --instance-type c1.xlarge \
+  my-hadoop-cluster 10
+
+This command launches a 10-node cluster using the specified image and instance
+type, overriding the equivalent settings (if any) that are in the
+"my-hadoop-cluster" section of the configuration file. Note that words in
+options are separated by hyphens (--instance-type) while the corresponding
+configuration parameter is are separated by underscores (instance_type).
+
+The scripts install Hadoop RPMs or Debian packages (depending on the OS) at
+instance boot time.
+
+By default, Apache Hadoop 0.20.1 is installed. You can also run other versions
+of Apache Hadoop. For example the following uses version 0.18.3:
+
+% hadoop-ec2 launch-cluster --env HADOOP_VERSION=0.18.3 \
+  my-hadoop-cluster 10
+
+CUSTOMIZATION
+=============
+
+You can specify a list of packages to install on every instance at boot time
+using the --user-packages command-line option (or the user_packages
+configuration parameter). Packages should be space-separated. Note that package
+names should reflect the package manager being used to install them (yum or
+apt-get depending on the OS).
+
+Here's an example that installs RPMs for R and git:
+
+% hadoop-ec2 launch-cluster --user-packages 'R git-core' my-hadoop-cluster 10
+
+You have full control over the script that is run when each instance boots. The
+default script, hadoop-ec2-init-remote.sh, may be used as a starting point to
+add extra configuration or customization of the instance. Make a copy of the
+script in your home directory, or somewhere similar, and set the
+--user-data-file command-line option (or the user_data_file configuration
+parameter) to point to the (modified) copy.  hadoop-ec2 will replace "%ENV%"
+in your user data script with
+USER_PACKAGES, AUTO_SHUTDOWN, and EBS_MAPPINGS, as well as extra parameters
+supplied using the --env commandline flag.
+
+Another way of customizing the instance, which may be more appropriate for
+larger changes, is to create you own image.
+
+It's possible to use any image, as long as it i) runs (gzip compressed) user
+data on boot, and ii) has Java installed.
+

+ 52 - 0
src/contrib/cloud/src/integration-test/create-ebs-snapshot.sh

@@ -0,0 +1,52 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This script tests the "hadoop-ec2 create-formatted-snapshot" command.
+# The snapshot is deleted immediately afterwards.
+#
+# Example usage:
+# ./create-ebs-snapshot.sh
+#
+
+set -e
+set -x
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+WORKSPACE=${WORKSPACE:-`pwd`}
+CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud}
+CLUSTER=${CLUSTER:-hadoop-cloud-$USER-test-cluster}
+AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c}
+KEY_NAME=${KEY_NAME:-$USER}
+HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py}
+HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2}
+SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \
+  -o StrictHostKeyChecking=no"}
+
+HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER
+
+$HADOOP_CLOUD_SCRIPT create-formatted-snapshot --config-dir=$CONFIG_DIR \
+  --key-name=$KEY_NAME --availability-zone=$AVAILABILITY_ZONE \
+  --ssh-options="$SSH_OPTIONS" \
+  $CLUSTER 1 > out.tmp
+
+snapshot_id=`grep 'Created snapshot' out.tmp | awk '{print $3}'`
+
+ec2-delete-snapshot $snapshot_id
+
+rm -f out.tmp

+ 30 - 0
src/contrib/cloud/src/integration-test/ebs-storage-spec.json

@@ -0,0 +1,30 @@
+{
+  "master": [
+    {
+      "device": "/dev/sdj",
+      "mount_point": "/ebs1",
+      "size_gb": "7",
+      "snapshot_id": "snap-fe44bb97"
+    },
+    {
+      "device": "/dev/sdk",
+      "mount_point": "/ebs2",
+      "size_gb": "7",
+      "snapshot_id": "snap-fe44bb97"
+    }
+  ],
+  "slave": [
+    {
+      "device": "/dev/sdj",
+      "mount_point": "/ebs1",
+      "size_gb": "7",
+      "snapshot_id": "snap-fe44bb97"
+    },
+    {
+      "device": "/dev/sdk",
+      "mount_point": "/ebs2",
+      "size_gb": "7",
+      "snapshot_id": "snap-fe44bb97"
+    }
+  ]
+}

+ 122 - 0
src/contrib/cloud/src/integration-test/persistent-cluster.sh

@@ -0,0 +1,122 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This script tests the Hadoop cloud scripts by running through a minimal
+# sequence of steps to start a persistent (EBS) cluster, run a job, then
+# shutdown the cluster.
+#
+# Example usage:
+# HADOOP_HOME=~/dev/hadoop-0.20.1/ ./persistent-cluster.sh
+#
+
+function wait_for_volume_detachment() {
+  set +e
+  set +x
+  while true; do
+    attached=`$HADOOP_CLOUD_SCRIPT list-storage --config-dir=$CONFIG_DIR \
+      $CLUSTER | awk '{print $6}' | grep 'attached'`
+    sleep 5
+    if [ -z "$attached" ]; then
+      break
+    fi
+  done
+  set -e
+  set -x
+}
+
+set -e
+set -x
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+WORKSPACE=${WORKSPACE:-`pwd`}
+CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud}
+CLUSTER=${CLUSTER:-hadoop-cloud-ebs-$USER-test-cluster}
+IMAGE_ID=${IMAGE_ID:-ami-6159bf08} # default to Fedora 32-bit AMI
+AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c}
+KEY_NAME=${KEY_NAME:-$USER}
+AUTO_SHUTDOWN=${AUTO_SHUTDOWN:-15}
+LOCAL_HADOOP_VERSION=${LOCAL_HADOOP_VERSION:-0.20.1}
+HADOOP_HOME=${HADOOP_HOME:-$WORKSPACE/hadoop-$LOCAL_HADOOP_VERSION}
+HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py}
+HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2}
+SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \
+  -o StrictHostKeyChecking=no"}
+
+HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER
+export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER
+
+# Install Hadoop locally
+if [ ! -d $HADOOP_HOME ]; then
+  wget http://archive.apache.org/dist/hadoop/core/hadoop-\
+$LOCAL_HADOOP_VERSION/hadoop-$LOCAL_HADOOP_VERSION.tar.gz
+  tar zxf hadoop-$LOCAL_HADOOP_VERSION.tar.gz -C $WORKSPACE
+  rm hadoop-$LOCAL_HADOOP_VERSION.tar.gz
+fi
+
+# Create storage
+$HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \
+  --availability-zone=$AVAILABILITY_ZONE $CLUSTER master 1 \
+  $bin/ebs-storage-spec.json
+$HADOOP_CLOUD_SCRIPT create-storage --config-dir=$CONFIG_DIR \
+  --availability-zone=$AVAILABILITY_ZONE $CLUSTER slave 1 \
+  $bin/ebs-storage-spec.json
+
+# Launch a cluster
+$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \
+  --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \
+  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1
+
+# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID
+eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \
+  --ssh-options="$SSH_OPTIONS" $CLUSTER`
+
+# Run a job and check it works
+$HADOOP_HOME/bin/hadoop fs -mkdir input
+$HADOOP_HOME/bin/hadoop fs -put $HADOOP_HOME/LICENSE.txt input
+$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-*-examples.jar grep \
+  input output Apache
+# following returns a non-zero exit code if no match
+$HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache
+
+# Shutdown the cluster
+kill $HADOOP_CLOUD_PROXY_PID
+$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER
+sleep 5 # wait for termination to take effect
+
+# Relaunch the cluster
+$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \
+  --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \
+  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1
+
+# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID
+eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \
+  --ssh-options="$SSH_OPTIONS" $CLUSTER`
+
+# Check output is still there
+$HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache
+
+# Shutdown the cluster
+kill $HADOOP_CLOUD_PROXY_PID
+$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER
+sleep 5 # wait for termination to take effect
+
+# Cleanup
+$HADOOP_CLOUD_SCRIPT delete-cluster --config-dir=$CONFIG_DIR $CLUSTER
+wait_for_volume_detachment
+$HADOOP_CLOUD_SCRIPT delete-storage --config-dir=$CONFIG_DIR --force $CLUSTER

+ 77 - 0
src/contrib/cloud/src/integration-test/transient-cluster.sh

@@ -0,0 +1,77 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This script tests the Hadoop cloud scripts by running through a minimal
+# sequence of steps to start a cluster, run a job, then shutdown the cluster.
+#
+# Example usage:
+# HADOOP_HOME=~/dev/hadoop-0.20.1/ ./transient-cluster.sh
+#
+
+set -e
+set -x
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+WORKSPACE=${WORKSPACE:-`pwd`}
+CONFIG_DIR=${CONFIG_DIR:-$WORKSPACE/.hadoop-cloud}
+CLUSTER=${CLUSTER:-hadoop-cloud-$USER-test-cluster}
+IMAGE_ID=${IMAGE_ID:-ami-6159bf08} # default to Fedora 32-bit AMI
+AVAILABILITY_ZONE=${AVAILABILITY_ZONE:-us-east-1c}
+KEY_NAME=${KEY_NAME:-$USER}
+AUTO_SHUTDOWN=${AUTO_SHUTDOWN:-15}
+LOCAL_HADOOP_VERSION=${LOCAL_HADOOP_VERSION:-0.20.1}
+HADOOP_HOME=${HADOOP_HOME:-$WORKSPACE/hadoop-$LOCAL_HADOOP_VERSION}
+HADOOP_CLOUD_HOME=${HADOOP_CLOUD_HOME:-$bin/../py}
+HADOOP_CLOUD_PROVIDER=${HADOOP_CLOUD_PROVIDER:-ec2}
+SSH_OPTIONS=${SSH_OPTIONS:-"-i ~/.$HADOOP_CLOUD_PROVIDER/id_rsa-$KEY_NAME \
+  -o StrictHostKeyChecking=no"}
+
+HADOOP_CLOUD_SCRIPT=$HADOOP_CLOUD_HOME/hadoop-$HADOOP_CLOUD_PROVIDER
+export HADOOP_CONF_DIR=$CONFIG_DIR/$CLUSTER
+
+# Install Hadoop locally
+if [ ! -d $HADOOP_HOME ]; then
+  wget http://archive.apache.org/dist/hadoop/core/hadoop-\
+$LOCAL_HADOOP_VERSION/hadoop-$LOCAL_HADOOP_VERSION.tar.gz
+  tar zxf hadoop-$LOCAL_HADOOP_VERSION.tar.gz -C $WORKSPACE
+  rm hadoop-$LOCAL_HADOOP_VERSION.tar.gz
+fi
+
+# Launch a cluster
+$HADOOP_CLOUD_SCRIPT launch-cluster --config-dir=$CONFIG_DIR \
+  --image-id=$IMAGE_ID --key-name=$KEY_NAME --auto-shutdown=$AUTO_SHUTDOWN \
+  --availability-zone=$AVAILABILITY_ZONE $CLIENT_CIDRS $ENVS $CLUSTER 1
+
+# Run a proxy and save its pid in HADOOP_CLOUD_PROXY_PID
+eval `$HADOOP_CLOUD_SCRIPT proxy --config-dir=$CONFIG_DIR \
+  --ssh-options="$SSH_OPTIONS" $CLUSTER`
+
+# Run a job and check it works
+$HADOOP_HOME/bin/hadoop fs -mkdir input
+$HADOOP_HOME/bin/hadoop fs -put $HADOOP_HOME/LICENSE.txt input
+$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-*-examples.jar grep \
+  input output Apache
+# following returns a non-zero exit code if no match
+$HADOOP_HOME/bin/hadoop fs -cat 'output/part-00000' | grep Apache
+
+# Shutdown the cluster
+kill $HADOOP_CLOUD_PROXY_PID
+$HADOOP_CLOUD_SCRIPT terminate-cluster --config-dir=$CONFIG_DIR --force $CLUSTER
+sleep 5 # wait for termination to take effect
+$HADOOP_CLOUD_SCRIPT delete-cluster --config-dir=$CONFIG_DIR $CLUSTER

+ 1 - 0
src/contrib/cloud/src/py/VERSION

@@ -0,0 +1 @@
+0.22.0

+ 21 - 0
src/contrib/cloud/src/py/hadoop-ec2

@@ -0,0 +1,21 @@
+#!/usr/bin/env python2.5
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from hadoop.cloud.cli import main
+
+if __name__ == "__main__":
+  main()

+ 530 - 0
src/contrib/cloud/src/py/hadoop-ec2-init-remote.sh

@@ -0,0 +1,530 @@
+#!/bin/bash -x
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+################################################################################
+# Script that is run on each EC2 instance on boot. It is passed in the EC2 user
+# data, so should not exceed 16K in size after gzip compression.
+#
+# This script is executed by /etc/init.d/ec2-run-user-data, and output is
+# logged to /var/log/messages.
+################################################################################
+
+################################################################################
+# Initialize variables
+################################################################################
+
+# Substitute environment variables passed by the client
+export %ENV%
+
+if [ -z "$MASTER_HOST" ]; then
+  IS_MASTER=true
+  MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/public-hostname`
+else
+  IS_MASTER=false
+fi
+
+HADOOP_VERSION=${HADOOP_VERSION:-0.20.1}
+HADOOP_HOME=/usr/local/hadoop-$HADOOP_VERSION
+HADOOP_CONF_DIR=$HADOOP_HOME/conf
+
+function register_auto_shutdown() {
+  if [ ! -z "$AUTO_SHUTDOWN" ]; then
+    shutdown -h +$AUTO_SHUTDOWN >/dev/null &
+  fi
+}
+
+# Install a list of packages on debian or redhat as appropriate
+function install_packages() {
+  if which dpkg &> /dev/null; then
+    apt-get update
+    apt-get -y install $@
+  elif which rpm &> /dev/null; then
+    yum install -y $@
+  else
+    echo "No package manager found."
+  fi
+}
+
+# Install any user packages specified in the USER_PACKAGES environment variable
+function install_user_packages() {
+  if [ ! -z "$USER_PACKAGES" ]; then
+    install_packages $USER_PACKAGES
+  fi
+}
+
+function install_hadoop() {
+  useradd hadoop
+
+  hadoop_tar_url=http://s3.amazonaws.com/hadoop-releases/core/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
+  hadoop_tar_file=`basename $hadoop_tar_url`
+  hadoop_tar_md5_file=`basename $hadoop_tar_url.md5`
+
+  curl="curl --retry 3 --silent --show-error --fail"
+  for i in `seq 1 3`;
+  do
+    $curl -O $hadoop_tar_url
+    $curl -O $hadoop_tar_url.md5
+    if md5sum -c $hadoop_tar_md5_file; then
+      break;
+    else
+      rm -f $hadoop_tar_file $hadoop_tar_md5_file
+    fi
+  done
+
+  if [ ! -e $hadoop_tar_file ]; then
+    echo "Failed to download $hadoop_tar_url. Aborting."
+    exit 1
+  fi
+
+  tar zxf $hadoop_tar_file -C /usr/local
+  rm -f $hadoop_tar_file $hadoop_tar_md5_file
+
+  echo "export HADOOP_HOME=$HADOOP_HOME" >> ~root/.bashrc
+  echo 'export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH' >> ~root/.bashrc
+}
+
+function prep_disk() {
+  mount=$1
+  device=$2
+  automount=${3:-false}
+
+  echo "warning: ERASING CONTENTS OF $device"
+  mkfs.xfs -f $device
+  if [ ! -e $mount ]; then
+    mkdir $mount
+  fi
+  mount -o defaults,noatime $device $mount
+  if $automount ; then
+    echo "$device $mount xfs defaults,noatime 0 0" >> /etc/fstab
+  fi
+}
+
+function wait_for_mount {
+  mount=$1
+  device=$2
+
+  mkdir $mount
+
+  i=1
+  echo "Attempting to mount $device"
+  while true ; do
+    sleep 10
+    echo -n "$i "
+    i=$[$i+1]
+    mount -o defaults,noatime $device $mount || continue
+    echo " Mounted."
+    break;
+  done
+}
+
+function make_hadoop_dirs {
+  for mount in "$@"; do
+    if [ ! -e $mount/hadoop ]; then
+      mkdir -p $mount/hadoop
+      chown hadoop:hadoop $mount/hadoop
+    fi
+  done
+}
+
+# Configure Hadoop by setting up disks and site file
+function configure_hadoop() {
+
+  install_packages xfsprogs # needed for XFS
+
+  INSTANCE_TYPE=`wget -q -O - http://169.254.169.254/latest/meta-data/instance-type`
+
+  if [ -n "$EBS_MAPPINGS" ]; then
+    # EBS_MAPPINGS is like "/ebs1,/dev/sdj;/ebs2,/dev/sdk"
+    DFS_NAME_DIR=''
+    FS_CHECKPOINT_DIR=''
+    DFS_DATA_DIR=''
+    for mapping in $(echo "$EBS_MAPPINGS" | tr ";" "\n"); do
+      # Split on the comma (see "Parameter Expansion" in the bash man page)
+      mount=${mapping%,*}
+      device=${mapping#*,}
+      wait_for_mount $mount $device
+      DFS_NAME_DIR=${DFS_NAME_DIR},"$mount/hadoop/hdfs/name"
+      FS_CHECKPOINT_DIR=${FS_CHECKPOINT_DIR},"$mount/hadoop/hdfs/secondary"
+      DFS_DATA_DIR=${DFS_DATA_DIR},"$mount/hadoop/hdfs/data"
+      FIRST_MOUNT=${FIRST_MOUNT-$mount}
+      make_hadoop_dirs $mount
+    done
+    # Remove leading commas
+    DFS_NAME_DIR=${DFS_NAME_DIR#?}
+    FS_CHECKPOINT_DIR=${FS_CHECKPOINT_DIR#?}
+    DFS_DATA_DIR=${DFS_DATA_DIR#?}
+
+    DFS_REPLICATION=3 # EBS is internally replicated, but we also use HDFS replication for safety
+  else
+    case $INSTANCE_TYPE in
+    m1.xlarge|c1.xlarge)
+      DFS_NAME_DIR=/mnt/hadoop/hdfs/name,/mnt2/hadoop/hdfs/name
+      FS_CHECKPOINT_DIR=/mnt/hadoop/hdfs/secondary,/mnt2/hadoop/hdfs/secondary
+      DFS_DATA_DIR=/mnt/hadoop/hdfs/data,/mnt2/hadoop/hdfs/data,/mnt3/hadoop/hdfs/data,/mnt4/hadoop/hdfs/data
+      ;;
+    m1.large)
+      DFS_NAME_DIR=/mnt/hadoop/hdfs/name,/mnt2/hadoop/hdfs/name
+      FS_CHECKPOINT_DIR=/mnt/hadoop/hdfs/secondary,/mnt2/hadoop/hdfs/secondary
+      DFS_DATA_DIR=/mnt/hadoop/hdfs/data,/mnt2/hadoop/hdfs/data
+      ;;
+    *)
+      # "m1.small" or "c1.medium"
+      DFS_NAME_DIR=/mnt/hadoop/hdfs/name
+      FS_CHECKPOINT_DIR=/mnt/hadoop/hdfs/secondary
+      DFS_DATA_DIR=/mnt/hadoop/hdfs/data
+      ;;
+    esac
+    FIRST_MOUNT=/mnt
+    DFS_REPLICATION=3
+  fi
+
+  case $INSTANCE_TYPE in
+  m1.xlarge|c1.xlarge)
+    prep_disk /mnt2 /dev/sdc true &
+    disk2_pid=$!
+    prep_disk /mnt3 /dev/sdd true &
+    disk3_pid=$!
+    prep_disk /mnt4 /dev/sde true &
+    disk4_pid=$!
+    wait $disk2_pid $disk3_pid $disk4_pid
+    MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local,/mnt2/hadoop/mapred/local,/mnt3/hadoop/mapred/local,/mnt4/hadoop/mapred/local
+    MAX_MAP_TASKS=8
+    MAX_REDUCE_TASKS=4
+    CHILD_OPTS=-Xmx680m
+    CHILD_ULIMIT=1392640
+    ;;
+  m1.large)
+    prep_disk /mnt2 /dev/sdc true
+    MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local,/mnt2/hadoop/mapred/local
+    MAX_MAP_TASKS=4
+    MAX_REDUCE_TASKS=2
+    CHILD_OPTS=-Xmx1024m
+    CHILD_ULIMIT=2097152
+    ;;
+  c1.medium)
+    MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local
+    MAX_MAP_TASKS=4
+    MAX_REDUCE_TASKS=2
+    CHILD_OPTS=-Xmx550m
+    CHILD_ULIMIT=1126400
+    ;;
+  *)
+    # "m1.small"
+    MAPRED_LOCAL_DIR=/mnt/hadoop/mapred/local
+    MAX_MAP_TASKS=2
+    MAX_REDUCE_TASKS=1
+    CHILD_OPTS=-Xmx550m
+    CHILD_ULIMIT=1126400
+    ;;
+  esac
+
+  make_hadoop_dirs `ls -d /mnt*`
+
+  # Create tmp directory
+  mkdir /mnt/tmp
+  chmod a+rwxt /mnt/tmp
+
+  ##############################################################################
+  # Modify this section to customize your Hadoop cluster.
+  ##############################################################################
+  cat > $HADOOP_CONF_DIR/hadoop-site.xml <<EOF
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<configuration>
+<property>
+  <name>dfs.block.size</name>
+  <value>134217728</value>
+  <final>true</final>
+</property>
+<property>
+  <name>dfs.data.dir</name>
+  <value>$DFS_DATA_DIR</value>
+  <final>true</final>
+</property>
+<property>
+  <name>dfs.datanode.du.reserved</name>
+  <value>1073741824</value>
+  <final>true</final>
+</property>
+<property>
+  <name>dfs.datanode.handler.count</name>
+  <value>3</value>
+  <final>true</final>
+</property>
+<!--property>
+  <name>dfs.hosts</name>
+  <value>$HADOOP_CONF_DIR/dfs.hosts</value>
+  <final>true</final>
+</property-->
+<!--property>
+  <name>dfs.hosts.exclude</name>
+  <value>$HADOOP_CONF_DIR/dfs.hosts.exclude</value>
+  <final>true</final>
+</property-->
+<property>
+  <name>dfs.name.dir</name>
+  <value>$DFS_NAME_DIR</value>
+  <final>true</final>
+</property>
+<property>
+  <name>dfs.namenode.handler.count</name>
+  <value>5</value>
+  <final>true</final>
+</property>
+<property>
+  <name>dfs.permissions</name>
+  <value>true</value>
+  <final>true</final>
+</property>
+<property>
+  <name>dfs.replication</name>
+  <value>$DFS_REPLICATION</value>
+</property>
+<property>
+  <name>fs.checkpoint.dir</name>
+  <value>$FS_CHECKPOINT_DIR</value>
+  <final>true</final>
+</property>
+<property>
+  <name>fs.default.name</name>
+  <value>hdfs://$MASTER_HOST:8020/</value>
+</property>
+<property>
+  <name>fs.trash.interval</name>
+  <value>1440</value>
+  <final>true</final>
+</property>
+<property>
+  <name>hadoop.tmp.dir</name>
+  <value>/mnt/tmp/hadoop-\${user.name}</value>
+  <final>true</final>
+</property>
+<property>
+  <name>io.file.buffer.size</name>
+  <value>65536</value>
+</property>
+<property>
+  <name>mapred.child.java.opts</name>
+  <value>$CHILD_OPTS</value>
+</property>
+<property>
+  <name>mapred.child.ulimit</name>
+  <value>$CHILD_ULIMIT</value>
+  <final>true</final>
+</property>
+<property>
+  <name>mapred.job.tracker</name>
+  <value>$MASTER_HOST:8021</value>
+</property>
+<property>
+  <name>mapred.job.tracker.handler.count</name>
+  <value>5</value>
+  <final>true</final>
+</property>
+<property>
+  <name>mapred.local.dir</name>
+  <value>$MAPRED_LOCAL_DIR</value>
+  <final>true</final>
+</property>
+<property>
+  <name>mapred.map.tasks.speculative.execution</name>
+  <value>true</value>
+</property>
+<property>
+  <name>mapred.reduce.parallel.copies</name>
+  <value>10</value>
+</property>
+<property>
+  <name>mapred.reduce.tasks</name>
+  <value>10</value>
+</property>
+<property>
+  <name>mapred.reduce.tasks.speculative.execution</name>
+  <value>false</value>
+</property>
+<property>
+  <name>mapred.submit.replication</name>
+  <value>10</value>
+</property>
+<property>
+  <name>mapred.system.dir</name>
+  <value>/hadoop/system/mapred</value>
+</property>
+<property>
+  <name>mapred.tasktracker.map.tasks.maximum</name>
+  <value>$MAX_MAP_TASKS</value>
+  <final>true</final>
+</property>
+<property>
+  <name>mapred.tasktracker.reduce.tasks.maximum</name>
+  <value>$MAX_REDUCE_TASKS</value>
+  <final>true</final>
+</property>
+<property>
+  <name>tasktracker.http.threads</name>
+  <value>46</value>
+  <final>true</final>
+</property>
+<property>
+  <name>mapred.compress.map.output</name>
+  <value>true</value>
+</property>
+<property>
+  <name>mapred.output.compression.type</name>
+  <value>BLOCK</value>
+</property>
+<property>
+  <name>hadoop.rpc.socket.factory.class.default</name>
+  <value>org.apache.hadoop.net.StandardSocketFactory</value>
+  <final>true</final>
+</property>
+<property>
+  <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
+  <value></value>
+  <final>true</final>
+</property>
+<property>
+  <name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name>
+  <value></value>
+  <final>true</final>
+</property>
+<property>
+  <name>io.compression.codecs</name>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec</value>
+</property>
+<property>
+  <name>fs.s3.awsAccessKeyId</name>
+  <value>$AWS_ACCESS_KEY_ID</value>
+</property>
+<property>
+  <name>fs.s3.awsSecretAccessKey</name>
+  <value>$AWS_SECRET_ACCESS_KEY</value>
+</property>
+<property>
+  <name>fs.s3n.awsAccessKeyId</name>
+  <value>$AWS_ACCESS_KEY_ID</value>
+</property>
+<property>
+  <name>fs.s3n.awsSecretAccessKey</name>
+  <value>$AWS_SECRET_ACCESS_KEY</value>
+</property>
+</configuration>
+EOF
+
+  # Keep PID files in a non-temporary directory
+  sed -i -e "s|# export HADOOP_PID_DIR=.*|export HADOOP_PID_DIR=/var/run/hadoop|" \
+    $HADOOP_CONF_DIR/hadoop-env.sh
+  mkdir -p /var/run/hadoop
+  chown -R hadoop:hadoop /var/run/hadoop
+
+  # Set SSH options within the cluster
+  sed -i -e 's|# export HADOOP_SSH_OPTS=.*|export HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no"|' \
+    $HADOOP_CONF_DIR/hadoop-env.sh
+
+  # Hadoop logs should be on the /mnt partition
+  sed -i -e 's|# export HADOOP_LOG_DIR=.*|export HADOOP_LOG_DIR=/var/log/hadoop/logs|' \
+    $HADOOP_CONF_DIR/hadoop-env.sh
+  rm -rf /var/log/hadoop
+  mkdir /mnt/hadoop/logs
+  chown hadoop:hadoop /mnt/hadoop/logs
+  ln -s /mnt/hadoop/logs /var/log/hadoop
+  chown -R hadoop:hadoop /var/log/hadoop
+
+}
+
+# Sets up small website on cluster.
+function setup_web() {
+
+  if which dpkg &> /dev/null; then
+    apt-get -y install thttpd
+    WWW_BASE=/var/www
+  elif which rpm &> /dev/null; then
+    yum install -y thttpd
+    chkconfig --add thttpd
+    WWW_BASE=/var/www/thttpd/html
+  fi
+
+  cat > $WWW_BASE/index.html << END
+<html>
+<head>
+<title>Hadoop EC2 Cluster</title>
+</head>
+<body>
+<h1>Hadoop EC2 Cluster</h1>
+To browse the cluster you need to have a proxy configured.
+Start the proxy with <tt>hadoop-ec2 proxy &lt;cluster_name&gt;</tt>,
+and point your browser to
+<a href="http://apache-hadoop-ec2.s3.amazonaws.com/proxy.pac">this Proxy
+Auto-Configuration (PAC)</a> file.  To manage multiple proxy configurations,
+you may wish to use
+<a href="https://addons.mozilla.org/en-US/firefox/addon/2464">FoxyProxy</a>.
+<ul>
+<li><a href="http://$MASTER_HOST:50070/">NameNode</a>
+<li><a href="http://$MASTER_HOST:50030/">JobTracker</a>
+</ul>
+</body>
+</html>
+END
+
+  service thttpd start
+
+}
+
+function start_hadoop_master() {
+  if which dpkg &> /dev/null; then
+    AS_HADOOP="su -s /bin/bash - hadoop -c"
+  elif which rpm &> /dev/null; then
+    AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c"
+  fi
+
+  # Format HDFS
+  [ ! -e $FIRST_MOUNT/hadoop/hdfs ] && $AS_HADOOP "$HADOOP_HOME/bin/hadoop namenode -format"
+
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start namenode"
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start secondarynamenode"
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start jobtracker"
+
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop dfsadmin -safemode wait"
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -mkdir /user"
+  # The following is questionable, as it allows a user to delete another user
+  # It's needed to allow users to create their own user directories
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop fs -chmod +w /user"
+
+}
+
+function start_hadoop_slave() {
+  if which dpkg &> /dev/null; then
+    AS_HADOOP="su -s /bin/bash - hadoop -c"
+  elif which rpm &> /dev/null; then
+    AS_HADOOP="/sbin/runuser -s /bin/bash - hadoop -c"
+  fi
+
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start datanode"
+  $AS_HADOOP "$HADOOP_HOME/bin/hadoop-daemon.sh start tasktracker"
+}
+
+register_auto_shutdown
+install_user_packages
+install_hadoop
+configure_hadoop
+
+if $IS_MASTER ; then
+  setup_web
+  start_hadoop_master
+else
+  start_hadoop_slave
+fi

+ 14 - 0
src/contrib/cloud/src/py/hadoop/__init__.py

@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+ 14 - 0
src/contrib/cloud/src/py/hadoop/cloud/__init__.py

@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+ 456 - 0
src/contrib/cloud/src/py/hadoop/cloud/cli.py

@@ -0,0 +1,456 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import with_statement
+
+import ConfigParser
+import hadoop.cloud.commands as commands
+from hadoop.cloud.cluster import get_cluster
+from hadoop.cloud.cluster import TimeoutException
+from hadoop.cloud.providers.ec2 import Ec2Storage
+from hadoop.cloud.util import merge_config_with_options
+from hadoop.cloud.util import xstr
+import logging
+from optparse import OptionParser
+from optparse import make_option
+import os
+import subprocess
+import sys
+
+version_file = os.path.join(sys.path[0], "VERSION")
+VERSION = open(version_file, "r").read().strip()
+
+DEFAULT_CLOUD_PROVIDER = 'ec2'
+
+DEFAULT_CONFIG_DIR_NAME = '.hadoop-cloud'
+DEFAULT_CONFIG_DIR = os.path.join(os.environ['HOME'], DEFAULT_CONFIG_DIR_NAME)
+CONFIG_FILENAME = 'clusters.cfg'
+
+CONFIG_DIR_OPTION = \
+  make_option("--config-dir", metavar="CONFIG-DIR",
+    help="The configuration directory.")
+
+PROVIDER_OPTION = \
+  make_option("--cloud-provider", metavar="PROVIDER",
+    help="The cloud provider, e.g. 'ec2' for Amazon EC2.")
+
+BASIC_OPTIONS = [
+  CONFIG_DIR_OPTION,
+  PROVIDER_OPTION,
+]
+
+LAUNCH_OPTIONS = [
+  CONFIG_DIR_OPTION,
+  PROVIDER_OPTION,
+  make_option("-a", "--ami", metavar="AMI",
+    help="The AMI ID of the image to launch. (Amazon EC2 only. Deprecated, use \
+--image-id.)"),
+  make_option("-e", "--env", metavar="ENV", action="append",
+    help="An environment variable to pass to instances. \
+(May be specified multiple times.)"),
+  make_option("-f", "--user-data-file", metavar="URL",
+    help="The URL of the file containing user data to be made available to \
+instances."),
+  make_option("--image-id", metavar="ID",
+    help="The ID of the image to launch."),
+  make_option("-k", "--key-name", metavar="KEY-PAIR",
+    help="The key pair to use when launching instances. (Amazon EC2 only.)"),
+  make_option("-p", "--user-packages", metavar="PACKAGES",
+    help="A space-separated list of packages to install on instances on start \
+up."),
+  make_option("-t", "--instance-type", metavar="TYPE",
+    help="The type of instance to be launched. One of m1.small, m1.large, \
+m1.xlarge, c1.medium, or c1.xlarge."),
+  make_option("-z", "--availability-zone", metavar="ZONE",
+    help="The availability zone to run the instances in."),
+  make_option("--auto-shutdown", metavar="TIMEOUT_MINUTES",
+    help="The time in minutes after launch when an instance will be \
+automatically shut down."),
+  make_option("--client-cidr", metavar="CIDR", action="append",
+    help="The CIDR of the client, which is used to allow access through the \
+firewall to the master node. (May be specified multiple times.)"),
+  make_option("--public-key", metavar="FILE",
+    help="The public key to authorize on launching instances. (Non-EC2 \
+providers only.)"),
+]
+
+SNAPSHOT_OPTIONS = [
+  CONFIG_DIR_OPTION,
+  PROVIDER_OPTION,
+  make_option("-k", "--key-name", metavar="KEY-PAIR",
+    help="The key pair to use when launching instances."),
+  make_option("-z", "--availability-zone", metavar="ZONE",
+    help="The availability zone to run the instances in."),
+  make_option("--ssh-options", metavar="SSH-OPTIONS",
+    help="SSH options to use."),
+]
+
+PLACEMENT_OPTIONS = [
+  CONFIG_DIR_OPTION,
+  PROVIDER_OPTION,
+  make_option("-z", "--availability-zone", metavar="ZONE",
+    help="The availability zone to run the instances in."),
+]
+
+FORCE_OPTIONS = [
+  CONFIG_DIR_OPTION,
+  PROVIDER_OPTION,
+  make_option("--force", action="store_true", default=False,
+  help="Do not ask for confirmation."),
+]
+
+SSH_OPTIONS = [
+  CONFIG_DIR_OPTION,
+  PROVIDER_OPTION,
+  make_option("--ssh-options", metavar="SSH-OPTIONS",
+    help="SSH options to use."),
+]
+
+def print_usage(script):
+  print """Usage: %(script)s COMMAND [OPTIONS]
+where COMMAND and [OPTIONS] may be one of:
+  list [CLUSTER]                      list all running Hadoop clusters
+                                        or instances in CLUSTER
+  launch-master CLUSTER               launch or find a master in CLUSTER
+  launch-slaves CLUSTER NUM_SLAVES    launch NUM_SLAVES slaves in CLUSTER
+  launch-cluster CLUSTER NUM_SLAVES   launch a master and NUM_SLAVES slaves
+                                        in CLUSTER
+  create-formatted-snapshot CLUSTER   create an empty, formatted snapshot of
+    SIZE                                size SIZE GiB
+  list-storage CLUSTER                list storage volumes for CLUSTER
+  create-storage CLUSTER ROLE         create volumes for NUM_INSTANCES instances
+    NUM_INSTANCES SPEC_FILE             in ROLE for CLUSTER, using SPEC_FILE
+  attach-storage ROLE                 attach storage volumes for ROLE to CLUSTER
+  login CLUSTER                       log in to the master in CLUSTER over SSH
+  proxy CLUSTER                       start a SOCKS proxy on localhost into the
+                                        CLUSTER
+  push CLUSTER FILE                   scp FILE to the master in CLUSTER
+  exec CLUSTER CMD                    execute CMD on the master in CLUSTER
+  terminate-cluster CLUSTER           terminate all instances in CLUSTER
+  delete-cluster CLUSTER              delete the group information for CLUSTER
+  delete-storage CLUSTER              delete all storage volumes for CLUSTER
+  update-slaves-file CLUSTER          update the slaves file on the CLUSTER
+                                        master
+
+Use %(script)s COMMAND --help to see additional options for specific commands.
+""" % locals()
+
+def parse_options_and_config(command, option_list=[], extra_arguments=(),
+                             unbounded_args=False):
+  """
+  Parse the arguments to command using the given option list, and combine with
+  any configuration parameters.
+
+  If unbounded_args is true then there must be at least as many extra arguments
+  as specified by extra_arguments (the first argument is always CLUSTER).
+  Otherwise there must be exactly the same number of arguments as
+  extra_arguments.
+  """
+  expected_arguments = ["CLUSTER",]
+  expected_arguments.extend(extra_arguments)
+  (options_dict, args) = parse_options(command, option_list, expected_arguments,
+                                       unbounded_args)
+
+  config_dir = get_config_dir(options_dict)
+  config_files = [os.path.join(config_dir, CONFIG_FILENAME)]
+  if 'config_dir' not in options_dict:
+    # if config_dir not set, then also search in current directory
+    config_files.insert(0, CONFIG_FILENAME)
+
+  config = ConfigParser.ConfigParser()
+  read_files = config.read(config_files)
+  logging.debug("Read %d configuration files: %s", len(read_files),
+                ", ".join(read_files))
+  cluster_name = args[0]
+  opt = merge_config_with_options(cluster_name, config, options_dict)
+  logging.debug("Options: %s", str(opt))
+  return (opt, args, get_cluster(get_cloud_provider(opt))(cluster_name,
+                                                          config_dir))
+
+def parse_options(command, option_list=[], expected_arguments=(),
+                  unbounded_args=False):
+  """
+  Parse the arguments to command using the given option list.
+
+  If unbounded_args is true then there must be at least as many extra arguments
+  as specified by extra_arguments (the first argument is always CLUSTER).
+  Otherwise there must be exactly the same number of arguments as
+  extra_arguments.
+  """
+
+  config_file_name = "%s/%s" % (DEFAULT_CONFIG_DIR_NAME, CONFIG_FILENAME)
+  usage = """%%prog %s [options] %s
+
+Options may also be specified in a configuration file called
+%s located in the user's home directory.
+Options specified on the command line take precedence over any in the
+configuration file.""" % (command, " ".join(expected_arguments),
+                          config_file_name)
+  parser = OptionParser(usage=usage, version="%%prog %s" % VERSION,
+                        option_list=option_list)
+  parser.disable_interspersed_args()
+  (options, args) = parser.parse_args(sys.argv[2:])
+  if unbounded_args:
+    if len(args) < len(expected_arguments):
+      parser.error("incorrect number of arguments")
+  elif len(args) != len(expected_arguments):
+    parser.error("incorrect number of arguments")
+  return (vars(options), args)
+
+def get_config_dir(options_dict):
+  config_dir = options_dict.get('config_dir')
+  if not config_dir:
+    config_dir = DEFAULT_CONFIG_DIR
+  return config_dir
+
+def get_cloud_provider(options_dict):
+  provider = options_dict.get("cloud_provider", None)
+  if provider is None:
+    provider = DEFAULT_CLOUD_PROVIDER
+  return provider
+
+def check_options_set(options, option_names):
+  for option_name in option_names:
+    if options.get(option_name) is None:
+      print "Option '%s' is missing. Aborting." % option_name
+      sys.exit(1)
+
+def check_launch_options_set(cluster, options):
+  if cluster.get_provider_code() == 'ec2':
+    if options.get('ami') is None and options.get('image_id') is None:
+      print "One of ami or image_id must be specified. Aborting."
+      sys.exit(1)
+    check_options_set(options, ['key_name'])
+  else:
+    check_options_set(options, ['image_id', 'public_key'])
+
+def get_image_id(cluster, options):
+  if cluster.get_provider_code() == 'ec2':
+    return options.get('image_id', options.get('ami'))
+  else:
+    return options.get('image_id')
+
+def _prompt(prompt):
+  """ Returns true if user responds "yes" to prompt. """
+  return raw_input("%s [yes or no]: " % prompt).lower() == "yes"
+
+def main():
+  # Use HADOOP_CLOUD_LOGGING_LEVEL=DEBUG to enable debugging output.
+  logging.basicConfig(level=getattr(logging,
+                                    os.getenv("HADOOP_CLOUD_LOGGING_LEVEL",
+                                              "INFO")))
+
+  if len(sys.argv) < 2:
+    print_usage(sys.argv[0])
+    sys.exit(1)
+
+  command = sys.argv[1]
+
+  if command == 'list':
+    (opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
+    if len(args) == 0:
+      commands.list_all(get_cloud_provider(opt))
+    else:
+      (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
+      commands.list_cluster(cluster)
+
+  elif command == 'launch-master':
+    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS)
+    check_launch_options_set(cluster, opt)
+    config_dir = get_config_dir(opt)
+    commands.launch_master(cluster, config_dir, get_image_id(cluster, opt),
+      opt.get('instance_type'),
+      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
+      opt.get('availability_zone'), opt.get('user_packages'),
+      opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
+    commands.attach_storage(cluster, (commands.MASTER,))
+    try:
+      commands.wait_for_hadoop(cluster, 0)
+    except TimeoutException:
+      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
+        " master."
+    commands.print_master_url(cluster)
+
+  elif command == 'launch-slaves':
+    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS,
+                                                    ("NUM_SLAVES",))
+    number_of_slaves = int(args[1])
+    check_launch_options_set(cluster, opt)
+    config_dir = get_config_dir(opt)
+    commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt),
+      opt.get('instance_type'),
+      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
+      opt.get('availability_zone'), opt.get('user_packages'),
+      opt.get('auto_shutdown'), opt.get('env'))
+    commands.attach_storage(cluster, (commands.SLAVE,))
+    commands.print_master_url(cluster)
+
+  elif command == 'launch-cluster':
+    (opt, args, cluster) = parse_options_and_config(command, LAUNCH_OPTIONS,
+                                                    ("NUM_SLAVES",))
+    number_of_slaves = int(args[1])
+    check_launch_options_set(cluster, opt)
+    config_dir = get_config_dir(opt)
+    commands.launch_master(cluster, config_dir, get_image_id(cluster, opt),
+      opt.get('instance_type'),
+      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
+      opt.get('availability_zone'), opt.get('user_packages'),
+      opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
+    commands.launch_slaves(cluster, number_of_slaves, get_image_id(cluster, opt),
+      opt.get('instance_type'),
+      opt.get('key_name'), opt.get('public_key'), opt.get('user_data_file'),
+      opt.get('availability_zone'), opt.get('user_packages'),
+      opt.get('auto_shutdown'), opt.get('env'))
+    commands.attach_storage(cluster, commands.ROLES)
+    try:
+      commands.wait_for_hadoop(cluster, number_of_slaves)
+    except TimeoutException:
+      print "Timeout while waiting for Hadoop to start. Please check logs on" +\
+        " cluster."
+    commands.print_master_url(cluster)
+
+  elif command == 'login':
+    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
+    instances = cluster.check_running(commands.MASTER, 1)
+    if not instances:
+      sys.exit(1)
+    subprocess.call('ssh %s root@%s' % \
+                    (xstr(opt.get('ssh_options')), instances[0].public_ip),
+                    shell=True)
+
+  elif command == 'proxy':
+    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
+    instances = cluster.check_running(commands.MASTER, 1)
+    if not instances:
+      sys.exit(1)
+    options = '-o "ConnectTimeout 10" -o "ServerAliveInterval 60" ' \
+              '-N -D 6666'
+    process = subprocess.Popen('ssh %s %s root@%s' %
+      (xstr(opt.get('ssh_options')), options, instances[0].public_ip),
+      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+      shell=True)
+    print """export HADOOP_CLOUD_PROXY_PID=%s;
+echo Proxy pid %s;""" % (process.pid, process.pid)
+
+  elif command == 'push':
+    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS,
+                                                    ("FILE",))
+    instances = cluster.check_running(commands.MASTER, 1)
+    if not instances:
+      sys.exit(1)
+    subprocess.call('scp %s -r %s root@%s:' % (xstr(opt.get('ssh_options')),
+                                               args[1], instances[0].public_ip),
+                                               shell=True)
+
+  elif command == 'exec':
+    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS,
+                                                    ("CMD",), True)
+    instances = cluster.check_running(commands.MASTER, 1)
+    if not instances:
+      sys.exit(1)
+    subprocess.call("ssh %s root@%s '%s'" % (xstr(opt.get('ssh_options')),
+                                             instances[0].public_ip,
+                                             " ".join(args[1:])), shell=True)
+
+  elif command == 'terminate-cluster':
+    (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS)
+    cluster.print_status(commands.ROLES)
+    if not opt["force"] and not _prompt("Terminate all instances?"):
+      print "Not terminating cluster."
+    else:
+      print "Terminating cluster"
+      cluster.terminate()
+
+  elif command == 'delete-cluster':
+    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
+    cluster.delete()
+
+  elif command == 'create-formatted-snapshot':
+    (opt, args, cluster) = parse_options_and_config(command, SNAPSHOT_OPTIONS,
+                                                    ("SIZE",))
+    size = int(args[1])
+    check_options_set(opt, ['availability_zone', 'key_name'])
+    ami_ubuntu_intrepid_x86 = 'ami-ec48af85' # use a general AMI
+    Ec2Storage.create_formatted_snapshot(cluster, size,
+                                         opt.get('availability_zone'),
+                                         ami_ubuntu_intrepid_x86,
+                                         opt.get('key_name'),
+                                         xstr(opt.get('ssh_options')))
+
+  elif command == 'list-storage':
+    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS)
+    storage = cluster.get_storage()
+    storage.print_status(commands.ROLES)
+
+  elif command == 'create-storage':
+    (opt, args, cluster) = parse_options_and_config(command, PLACEMENT_OPTIONS,
+                                                    ("ROLE", "NUM_INSTANCES",
+                                                     "SPEC_FILE"))
+    storage = cluster.get_storage()
+    role = args[1]
+    number_of_instances = int(args[2])
+    spec_file = args[3]
+    check_options_set(opt, ['availability_zone'])
+    storage.create(role, number_of_instances, opt.get('availability_zone'),
+                   spec_file)
+    storage.print_status(commands.ROLES)
+
+  elif command == 'attach-storage':
+    (opt, args, cluster) = parse_options_and_config(command, BASIC_OPTIONS,
+                                                    ("ROLE",))
+    storage = cluster.get_storage()
+    role = args[1]
+    storage.attach(role, cluster.get_instances_in_role(role, 'running'))
+    storage.print_status(commands.ROLES)
+
+  elif command == 'delete-storage':
+    (opt, args, cluster) = parse_options_and_config(command, FORCE_OPTIONS)
+    storage = cluster.get_storage()
+    storage.print_status(commands.ROLES)
+    if not opt["force"] and not _prompt("Delete all storage volumes? THIS WILL \
+      PERMANENTLY DELETE ALL DATA"):
+      print "Not deleting storage volumes."
+    else:
+      print "Deleting storage"
+      for role in commands.ROLES:
+        storage.delete(role)
+
+  elif command == 'update-slaves-file':
+    (opt, args, cluster) = parse_options_and_config(command, SSH_OPTIONS)
+    check_options_set(opt, ['private_key'])
+    ssh_options = xstr(opt.get('ssh_options'))
+    instances = cluster.check_running(commands.MASTER, 1)
+    if not instances:
+      sys.exit(1)
+    master = instances[0]
+    slaves = cluster.get_instances_in_role(commands.SLAVE)
+    with open('slaves', 'w') as f:
+      for slave in slaves:
+        f.write(slave.public_ip + "\n")
+    subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % \
+                    (ssh_options, 'slaves', master.public_ip), shell=True)
+
+    # Copy private key
+    private_key = opt.get('private_key')
+    subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
+                    (ssh_options, private_key, master.public_ip), shell=True)
+    for slave in slaves:
+      subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % \
+                      (ssh_options, private_key, slave.public_ip), shell=True)
+
+  else:
+    print "Unrecognized command '%s'" % command
+    print_usage(sys.argv[0])
+    sys.exit(1)

+ 186 - 0
src/contrib/cloud/src/py/hadoop/cloud/cluster.py

@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Classes for controlling a cluster of cloud instances.
+"""
+
+from __future__ import with_statement
+
+import gzip
+import StringIO
+import urllib
+
+from hadoop.cloud.storage import Storage
+
+CLUSTER_PROVIDER_MAP = {
+  "dummy": ('hadoop.cloud.providers.dummy', 'DummyCluster'),
+  "ec2": ('hadoop.cloud.providers.ec2', 'Ec2Cluster'),
+}
+
+def get_cluster(provider):
+  """
+  Retrieve the Cluster class for a provider.
+  """
+  mod_name, driver_name = CLUSTER_PROVIDER_MAP[provider]
+  _mod = __import__(mod_name, globals(), locals(), [driver_name])
+  return getattr(_mod, driver_name)
+
+class Cluster(object):
+  """
+  A cluster of server instances. A cluster has a unique name.
+  One may launch instances which run in a certain role.
+  """
+
+  def __init__(self, name, config_dir):
+    self.name = name
+    self.config_dir = config_dir
+
+  def get_provider_code(self):
+    """
+    The code that uniquely identifies the cloud provider.
+    """
+    raise Exception("Unimplemented")
+
+  def authorize_role(self, role, from_port, to_port, cidr_ip):
+    """
+    Authorize access to machines in a given role from a given network.
+    """
+    pass
+
+  def get_instances_in_role(self, role, state_filter=None):
+    """
+    Get all the instances in a role, filtered by state.
+
+    @param role: the name of the role
+    @param state_filter: the state that the instance should be in
+    (e.g. "running"), or None for all states
+    """
+    raise Exception("Unimplemented")
+
+  def print_status(self, roles, state_filter="running"):
+    """
+    Print the status of instances in the given roles, filtered by state.
+    """
+    pass
+
+  def check_running(self, role, number):
+    """
+    Check that a certain number of instances in a role are running.
+    """
+    instances = self.get_instances_in_role(role, "running")
+    if len(instances) != number:
+      print "Expected %s instances in role %s, but was %s %s" % \
+        (number, role, len(instances), instances)
+      return False
+    else:
+      return instances
+
+  def launch_instances(self, role, number, image_id, size_id,
+                       instance_user_data, **kwargs):
+    """
+    Launch instances (of the given role) in the cluster.
+    Returns a list of IDs for the instances started.
+    """
+    pass
+
+  def wait_for_instances(self, instance_ids, timeout=600):
+    """
+    Wait for instances to start.
+    Raise TimeoutException if the timeout is exceeded.
+    """
+    pass
+
+  def terminate(self):
+    """
+    Terminate all instances in the cluster.
+    """
+    pass
+
+  def delete(self):
+    """
+    Delete the cluster permanently. This operation is only permitted if no
+    instances are running.
+    """
+    pass
+
+  def get_storage(self):
+    """
+    Return the external storage for the cluster.
+    """
+    return Storage(self)
+
+class InstanceUserData(object):
+  """
+  The data passed to an instance on start up.
+  """
+
+  def __init__(self, filename, replacements={}):
+    self.filename = filename
+    self.replacements = replacements
+
+  def _read_file(self, filename):
+    """
+    Read the user data.
+    """
+    return urllib.urlopen(filename).read()
+
+  def read(self):
+    """
+    Read the user data, making replacements.
+    """
+    contents = self._read_file(self.filename)
+    for (match, replacement) in self.replacements.iteritems():
+      if replacement == None:
+        replacement = ''
+      contents = contents.replace(match, replacement)
+    return contents
+
+  def read_as_gzip_stream(self):
+    """
+    Read and compress the data.
+    """
+    output = StringIO.StringIO()
+    compressed = gzip.GzipFile(mode='wb', fileobj=output)
+    compressed.write(self.read())
+    compressed.close()
+    return output.getvalue()
+
+class Instance(object):
+  """
+  A server instance.
+  """
+  def __init__(self, id, public_ip, private_ip):
+    self.id = id
+    self.public_ip = public_ip
+    self.private_ip = private_ip
+
+class RoleSyntaxException(Exception):
+  """
+  Raised when a role name is invalid. Role names may consist of a sequence
+  of alphanumeric characters and underscores. Dashes are not permitted in role
+  names.
+  """
+  def __init__(self, message):
+    super(RoleSyntaxException, self).__init__()
+    self.message = message
+  def __str__(self):
+    return repr(self.message)
+
+class TimeoutException(Exception):
+  """
+  Raised when a timeout is exceeded.
+  """
+  pass

+ 261 - 0
src/contrib/cloud/src/py/hadoop/cloud/commands.py

@@ -0,0 +1,261 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""High-level commands that a user may want to run"""
+
+from __future__ import with_statement
+
+from hadoop.cloud.cluster import get_cluster
+from hadoop.cloud.cluster import InstanceUserData
+from hadoop.cloud.cluster import TimeoutException
+from hadoop.cloud.util import build_env_string
+from hadoop.cloud.util import url_get
+import logging
+import os
+import re
+import socket
+import sys
+import time
+
+logger = logging.getLogger(__name__)
+
+MASTER = "master"
+SLAVE = "slave"
+# ROLES contains the set of all roles in a cluster. It may be expanded in the
+# future to support HBase, or split namnode and jobtracker instances.
+ROLES = (MASTER, SLAVE)
+
+def _get_default_user_data_file_template(cluster):
+  return os.path.join(sys.path[0], 'hadoop-%s-init-remote.sh' %
+               cluster.get_provider_code())
+
+def list_all(provider):
+  """
+  Find and print clusters that have a running 'master' instance
+  """
+  clusters = get_cluster(provider).get_clusters_with_role(MASTER)
+  if not clusters:
+    print "No running clusters"
+  else:
+    for cluster in clusters:
+      print cluster
+
+def list_cluster(cluster):
+  cluster.print_status(ROLES)
+
+def launch_master(cluster, config_dir, image_id, size_id, key_name, public_key,
+                  user_data_file_template=None, placement=None,
+                  user_packages=None, auto_shutdown=None, env_strings=[],
+                  client_cidrs=[]):
+  if user_data_file_template == None:
+    user_data_file_template = _get_default_user_data_file_template(cluster)
+  if cluster.check_running(MASTER, 0) == False:
+    return  # don't proceed if another master is running
+  ebs_mappings = ''
+  storage = cluster.get_storage()
+  if storage.has_any_storage((MASTER,)):
+    ebs_mappings = storage.get_mappings_string_for_role(MASTER)
+  replacements = { "%ENV%": build_env_string(env_strings, {
+    "USER_PACKAGES": user_packages,
+    "AUTO_SHUTDOWN": auto_shutdown,
+    "EBS_MAPPINGS": ebs_mappings
+  }) }
+  instance_user_data = InstanceUserData(user_data_file_template, replacements)
+  instance_ids = cluster.launch_instances(MASTER, 1, image_id, size_id,
+                                          instance_user_data,
+                                          key_name=key_name,
+                                          public_key=public_key,
+                                          placement=placement)
+  print "Waiting for master to start (%s)" % str(instance_ids[0])
+  try:
+    cluster.wait_for_instances(instance_ids)
+    print "Master started"
+  except TimeoutException:
+    print "Timeout while waiting for master instance to start."
+    return
+  cluster.print_status((MASTER,))
+  master = cluster.check_running(MASTER, 1)[0]
+  _authorize_client_ports(cluster, master, client_cidrs)
+  _create_client_hadoop_site_file(cluster, config_dir, master)
+
+def _authorize_client_ports(cluster, master, client_cidrs):
+  if not client_cidrs:
+    logger.debug("No client CIDRs specified, using local address.")
+    client_ip = url_get('http://checkip.amazonaws.com/').strip()
+    client_cidrs = ("%s/32" % client_ip,)
+  logger.debug("Client CIDRs: %s", client_cidrs)
+  for client_cidr in client_cidrs:
+    # Allow access to port 80 on master from client
+    cluster.authorize_role(MASTER, 80, 80, client_cidr)
+    # Allow access to jobtracker UI on master from client
+    # (so we can see when the cluster is ready)
+    cluster.authorize_role(MASTER, 50030, 50030, client_cidr)
+    # Allow access to namenode and jobtracker via public address from master
+    # node
+  master_ip = socket.gethostbyname(master.public_ip)
+  cluster.authorize_role(MASTER, 8020, 8021, "%s/32" % master_ip)
+
+def _create_client_hadoop_site_file(cluster, config_dir, master):
+  cluster_dir = os.path.join(config_dir, cluster.name)
+  aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
+  aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
+  if not os.path.exists(cluster_dir):
+    os.makedirs(cluster_dir)
+  with open(os.path.join(cluster_dir, 'hadoop-site.xml'), 'w') as f:
+    f.write("""<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!-- Put site-specific property overrides in this file. -->
+<configuration>
+<property>
+  <name>hadoop.job.ugi</name>
+  <value>root,root</value>
+</property>
+<property>
+  <name>fs.default.name</name>
+  <value>hdfs://%(master)s:8020/</value>
+</property>
+<property>
+  <name>mapred.job.tracker</name>
+  <value>%(master)s:8021</value>
+</property>
+<property>
+  <name>hadoop.socks.server</name>
+  <value>localhost:6666</value>
+</property>
+<property>
+  <name>hadoop.rpc.socket.factory.class.default</name>
+  <value>org.apache.hadoop.net.SocksSocketFactory</value>
+</property>
+<property>
+  <name>fs.s3.awsAccessKeyId</name>
+  <value>%(aws_access_key_id)s</value>
+</property>
+<property>
+  <name>fs.s3.awsSecretAccessKey</name>
+  <value>%(aws_secret_access_key)s</value>
+</property>
+<property>
+  <name>fs.s3n.awsAccessKeyId</name>
+  <value>%(aws_access_key_id)s</value>
+</property>
+<property>
+  <name>fs.s3n.awsSecretAccessKey</name>
+  <value>%(aws_secret_access_key)s</value>
+</property>
+</configuration>
+""" % {'master': master.public_ip,
+  'aws_access_key_id': aws_access_key_id,
+  'aws_secret_access_key': aws_secret_access_key})
+
+def launch_slaves(cluster, number, image_id, size_id, key_name,
+                  public_key,
+                  user_data_file_template=None, placement=None,
+                  user_packages=None, auto_shutdown=None, env_strings=[]):
+  if user_data_file_template == None:
+    user_data_file_template = _get_default_user_data_file_template(cluster)
+  instances = cluster.check_running(MASTER, 1)
+  if not instances:
+    return
+  master = instances[0]
+  ebs_mappings = ''
+  storage = cluster.get_storage()
+  if storage.has_any_storage((SLAVE,)):
+    ebs_mappings = storage.get_mappings_string_for_role(SLAVE)
+  replacements = { "%ENV%": build_env_string(env_strings, {
+    "USER_PACKAGES": user_packages,
+    "AUTO_SHUTDOWN": auto_shutdown,
+    "EBS_MAPPINGS": ebs_mappings,
+    "MASTER_HOST": master.public_ip
+  }) }
+  instance_user_data = InstanceUserData(user_data_file_template, replacements)
+  instance_ids = cluster.launch_instances(SLAVE, number, image_id, size_id,
+                                          instance_user_data,
+                                          key_name=key_name,
+                                          public_key=public_key,
+                                          placement=placement)
+  print "Waiting for slaves to start"
+  try:
+    cluster.wait_for_instances(instance_ids)
+    print "Slaves started"
+  except TimeoutException:
+    print "Timeout while waiting for slave instances to start."
+    return
+  print
+  cluster.print_status((SLAVE,))
+
+def wait_for_hadoop(cluster, number, timeout=600):
+  start_time = time.time()
+  instances = cluster.check_running(MASTER, 1)
+  if not instances:
+    return
+  master = instances[0]
+  print "Waiting for jobtracker to start"
+  previous_running = 0
+  while True:
+    if (time.time() - start_time >= timeout):
+      raise TimeoutException()
+    try:
+      actual_running = _number_of_tasktrackers(master.public_ip, 1)
+      break
+    except IOError:
+      pass
+    sys.stdout.write(".")
+    sys.stdout.flush()
+    time.sleep(1)
+  print
+  if number > 0:
+    print "Waiting for %d tasktrackers to start" % number
+    while actual_running < number:
+      if (time.time() - start_time >= timeout):
+        raise TimeoutException()
+      try:
+        actual_running = _number_of_tasktrackers(master.public_ip, 5, 2)
+        if actual_running != previous_running:
+          sys.stdout.write("%d" % actual_running)
+        sys.stdout.write(".")
+        sys.stdout.flush()
+        time.sleep(1)
+        previous_running = actual_running
+      except IOError:
+        raise TimeoutException()
+    print
+
+# The optional ?type=active is a difference between Hadoop 0.18 and 0.20
+NUMBER_OF_TASK_TRACKERS = re.compile(
+  r'<a href="machines.jsp(?:\?type=active)?">(\d+)</a>')
+
+def _number_of_tasktrackers(jt_hostname, timeout, retries=0):
+  jt_page = url_get("http://%s:50030/jobtracker.jsp" % jt_hostname, timeout,
+                    retries)
+  m = NUMBER_OF_TASK_TRACKERS.search(jt_page)
+  if m:
+    return int(m.group(1))
+  return 0
+
+def print_master_url(cluster):
+  instances = cluster.check_running(MASTER, 1)
+  if not instances:
+    return
+  master = instances[0]
+  print "Browse the cluster at http://%s/" % master.public_ip
+
+def attach_storage(cluster, roles):
+  storage = cluster.get_storage()
+  if storage.has_any_storage(roles):
+    print "Waiting 10 seconds before attaching storage"
+    time.sleep(10)
+    for role in roles:
+      storage.attach(role, cluster.get_instances_in_role(role, 'running'))
+    storage.print_status(roles)

+ 14 - 0
src/contrib/cloud/src/py/hadoop/cloud/providers/__init__.py

@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+ 61 - 0
src/contrib/cloud/src/py/hadoop/cloud/providers/dummy.py

@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from hadoop.cloud.cluster import Cluster
+from hadoop.cloud.cluster import Instance
+
+logger = logging.getLogger(__name__)
+
+class DummyCluster(Cluster):
+
+  @staticmethod
+  def get_clusters_with_role(role, state="running"):
+    logger.info("get_clusters_with_role(%s, %s)", role, state)
+    return ["dummy-cluster"]
+
+  def __init__(self, name, config_dir):
+    super(DummyCluster, self).__init__(name, config_dir)
+    logger.info("__init__(%s, %s)", name, config_dir)
+
+  def get_provider_code(self):
+    return "dummy"
+
+  def authorize_role(self, role, from_port, to_port, cidr_ip):
+    logger.info("authorize_role(%s, %s, %s, %s)", role, from_port, to_port,
+                cidr_ip)
+
+  def get_instances_in_role(self, role, state_filter=None):
+    logger.info("get_instances_in_role(%s, %s)", role, state_filter)
+    return [Instance(1, '127.0.0.1', '127.0.0.1')]
+
+  def print_status(self, roles, state_filter="running"):
+    logger.info("print_status(%s, %s)", roles, state_filter)
+
+  def launch_instances(self, role, number, image_id, size_id,
+                       instance_user_data, **kwargs):
+    logger.info("launch_instances(%s, %s, %s, %s, %s, %s)", role, number,
+                image_id, size_id, instance_user_data, str(kwargs))
+    return [1]
+
+  def wait_for_instances(self, instance_ids, timeout=600):
+    logger.info("wait_for_instances(%s, %s)", instance_ids, timeout)
+
+  def terminate(self):
+    logger.info("terminate")
+
+  def delete(self):
+    logger.info("delete")

+ 460 - 0
src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py

@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from boto.ec2.connection import EC2Connection
+from boto.exception import EC2ResponseError
+import logging
+from hadoop.cloud.cluster import Cluster
+from hadoop.cloud.cluster import Instance
+from hadoop.cloud.cluster import RoleSyntaxException
+from hadoop.cloud.cluster import TimeoutException
+from hadoop.cloud.storage import JsonVolumeManager
+from hadoop.cloud.storage import JsonVolumeSpecManager
+from hadoop.cloud.storage import MountableVolume
+from hadoop.cloud.storage import Storage
+from hadoop.cloud.util import xstr
+import os
+import re
+import subprocess
+import sys
+import time
+
+logger = logging.getLogger(__name__)
+
+def _run_command_on_instance(instance, ssh_options, command):
+  print "Running ssh %s root@%s '%s'" % \
+    (ssh_options, instance.public_dns_name, command)
+  retcode = subprocess.call("ssh %s root@%s '%s'" %
+                           (ssh_options, instance.public_dns_name, command),
+                           shell=True)
+  print "Command running on %s returned with value %s" % \
+    (instance.public_dns_name, retcode)
+
+def _wait_for_volume(ec2_connection, volume_id):
+  """
+  Waits until a volume becomes available.
+  """
+  while True:
+    volumes = ec2_connection.get_all_volumes([volume_id,])
+    if volumes[0].status == 'available':
+      break
+    sys.stdout.write(".")
+    sys.stdout.flush()
+    time.sleep(1)
+
+class Ec2Cluster(Cluster):
+  """
+  A cluster of EC2 instances. A cluster has a unique name.
+
+  Instances running in the cluster run in a security group with the cluster's
+  name, and also a name indicating the instance's role, e.g. <cluster-name>-foo
+  to show a "foo" instance.
+  """
+
+  @staticmethod
+  def get_clusters_with_role(role, state="running"):
+    all_instances = EC2Connection().get_all_instances()
+    clusters = []
+    for res in all_instances:
+      instance = res.instances[0]
+      for group in res.groups:
+        if group.id.endswith("-" + role) and instance.state == state:
+          clusters.append(re.sub("-%s$" % re.escape(role), "", group.id))
+    return clusters
+
+  def __init__(self, name, config_dir):
+    super(Ec2Cluster, self).__init__(name, config_dir)
+    self.ec2Connection = EC2Connection()
+
+  def get_provider_code(self):
+    return "ec2"
+
+  def _get_cluster_group_name(self):
+    return self.name
+
+  def _check_role_name(self, role):
+    if not re.match("^[a-zA-Z0-9_]+$", role):
+      raise RoleSyntaxException("Invalid role name '%s'" % role)
+
+  def _group_name_for_role(self, role):
+    """
+    Return the security group name for an instance in a given role.
+    """
+    self._check_role_name(role)
+    return "%s-%s" % (self.name, role)
+
+  def _get_group_names(self, role):
+    self._check_role_name(role)
+    return [self._get_cluster_group_name(), self._group_name_for_role(role)]
+
+  def _get_all_group_names(self):
+    security_groups = self.ec2Connection.get_all_security_groups()
+    security_group_names = \
+      [security_group.name for security_group in security_groups]
+    return security_group_names
+
+  def _get_all_group_names_for_cluster(self):
+    all_group_names = self._get_all_group_names()
+    r = []
+    if self.name not in all_group_names:
+      return r
+    for group in all_group_names:
+      if re.match("^%s(-[a-zA-Z0-9_]+)?$" % self.name, group):
+        r.append(group)
+    return r
+
+  def _create_groups(self, role):
+    """
+    Create the security groups for a given role, including a group for the
+    cluster if it doesn't exist.
+    """
+    self._check_role_name(role)
+    security_group_names = self._get_all_group_names()
+
+    cluster_group_name = self._get_cluster_group_name()
+    if not cluster_group_name in security_group_names:
+      self.ec2Connection.create_security_group(cluster_group_name,
+                                               "Cluster (%s)" % (self.name))
+      self.ec2Connection.authorize_security_group(cluster_group_name,
+                                                  cluster_group_name)
+      # Allow SSH from anywhere
+      self.ec2Connection.authorize_security_group(cluster_group_name,
+                                                  ip_protocol="tcp",
+                                                  from_port=22, to_port=22,
+                                                  cidr_ip="0.0.0.0/0")
+
+    role_group_name = self._group_name_for_role(role)
+    if not role_group_name in security_group_names:
+      self.ec2Connection.create_security_group(role_group_name,
+        "Role %s (%s)" % (role, self.name))
+
+  def authorize_role(self, role, from_port, to_port, cidr_ip):
+    """
+    Authorize access to machines in a given role from a given network.
+    """
+    self._check_role_name(role)
+    role_group_name = self._group_name_for_role(role)
+    # Revoke first to avoid InvalidPermission.Duplicate error
+    self.ec2Connection.revoke_security_group(role_group_name,
+                                             ip_protocol="tcp",
+                                             from_port=from_port,
+                                             to_port=to_port, cidr_ip=cidr_ip)
+    self.ec2Connection.authorize_security_group(role_group_name,
+                                                ip_protocol="tcp",
+                                                from_port=from_port,
+                                                to_port=to_port,
+                                                cidr_ip=cidr_ip)
+
+  def _get_instances(self, group_name, state_filter=None):
+    """
+    Get all the instances in a group, filtered by state.
+
+    @param group_name: the name of the group
+    @param state_filter: the state that the instance should be in
+      (e.g. "running"), or None for all states
+    """
+    all_instances = self.ec2Connection.get_all_instances()
+    instances = []
+    for res in all_instances:
+      for group in res.groups:
+        if group.id == group_name:
+          for instance in res.instances:
+            if state_filter == None or instance.state == state_filter:
+              instances.append(instance)
+    return instances
+
+  def get_instances_in_role(self, role, state_filter=None):
+    """
+    Get all the instances in a role, filtered by state.
+
+    @param role: the name of the role
+    @param state_filter: the state that the instance should be in
+      (e.g. "running"), or None for all states
+    """
+    self._check_role_name(role)
+    instances = []
+    for instance in self._get_instances(self._group_name_for_role(role),
+                                        state_filter):
+      instances.append(Instance(instance.id, instance.dns_name,
+                                instance.private_dns_name))
+    return instances
+
+  def _print_instance(self, role, instance):
+    print "\t".join((role, instance.id,
+      instance.image_id,
+      instance.dns_name, instance.private_dns_name,
+      instance.state, xstr(instance.key_name), instance.instance_type,
+      str(instance.launch_time), instance.placement))
+
+  def print_status(self, roles, state_filter="running"):
+    """
+    Print the status of instances in the given roles, filtered by state.
+    """
+    for role in roles:
+      for instance in self._get_instances(self._group_name_for_role(role),
+                                          state_filter):
+        self._print_instance(role, instance)
+
+  def launch_instances(self, role, number, image_id, size_id,
+                       instance_user_data, **kwargs):
+    self._check_role_name(role)
+
+    self._create_groups(role)
+    user_data = instance_user_data.read_as_gzip_stream()
+
+    reservation = self.ec2Connection.run_instances(image_id, min_count=number,
+      max_count=number, key_name=kwargs.get('key_name', None),
+      security_groups=self._get_group_names(role), user_data=user_data,
+      instance_type=size_id, placement=kwargs.get('placement', None))
+    return [instance.id for instance in reservation.instances]
+
+  def wait_for_instances(self, instance_ids, timeout=600):
+    start_time = time.time()
+    while True:
+      if (time.time() - start_time >= timeout):
+        raise TimeoutException()
+      try:
+        if self._all_started(self.ec2Connection.get_all_instances(instance_ids)):
+          break
+      # don't timeout for race condition where instance is not yet registered
+      except EC2ResponseError:
+        pass
+      sys.stdout.write(".")
+      sys.stdout.flush()
+      time.sleep(1)
+
+  def _all_started(self, reservations):
+    for res in reservations:
+      for instance in res.instances:
+        if instance.state != "running":
+          return False
+    return True
+
+  def terminate(self):
+    instances = self._get_instances(self._get_cluster_group_name(), "running")
+    if instances:
+      self.ec2Connection.terminate_instances([i.id for i in instances])
+
+  def delete(self):
+    """
+    Delete the security groups for each role in the cluster, and the group for
+    the cluster.
+    """
+    group_names = self._get_all_group_names_for_cluster()
+    for group in group_names:
+      self.ec2Connection.delete_security_group(group)
+
+  def get_storage(self):
+    """
+    Return the external storage for the cluster.
+    """
+    return Ec2Storage(self)
+
+
+class Ec2Storage(Storage):
+  """
+  Storage volumes for an EC2 cluster. The storage is associated with a named
+  cluster. Metadata for the storage volumes is kept in a JSON file on the client
+  machine (in a file called "ec2-storage-<cluster-name>.json" in the
+  configuration directory).
+  """
+
+  @staticmethod
+  def create_formatted_snapshot(cluster, size, availability_zone, image_id,
+                                key_name, ssh_options):
+    """
+    Creates a formatted snapshot of a given size. This saves having to format
+    volumes when they are first attached.
+    """
+    conn = cluster.ec2Connection
+    print "Starting instance"
+    reservation = conn.run_instances(image_id, key_name=key_name,
+                                     placement=availability_zone)
+    instance = reservation.instances[0]
+    try:
+      cluster.wait_for_instances([instance.id,])
+      print "Started instance %s" % instance.id
+    except TimeoutException:
+      print "Timeout"
+      return
+    print
+    print "Waiting 60 seconds before attaching storage"
+    time.sleep(60)
+    # Re-populate instance object since it has more details filled in
+    instance.update()
+
+    print "Creating volume of size %s in %s" % (size, availability_zone)
+    volume = conn.create_volume(size, availability_zone)
+    print "Created volume %s" % volume
+    print "Attaching volume to %s" % instance.id
+    volume.attach(instance.id, '/dev/sdj')
+
+    _run_command_on_instance(instance, ssh_options, """
+      while true ; do
+        echo 'Waiting for /dev/sdj...';
+        if [ -e /dev/sdj ]; then break; fi;
+        sleep 1;
+      done;
+      mkfs.ext3 -F -m 0.5 /dev/sdj
+    """)
+
+    print "Detaching volume"
+    conn.detach_volume(volume.id, instance.id)
+    print "Creating snapshot"
+    snapshot = volume.create_snapshot()
+    print "Created snapshot %s" % snapshot.id
+    _wait_for_volume(conn, volume.id)
+    print
+    print "Deleting volume"
+    volume.delete()
+    print "Deleted volume"
+    print "Stopping instance"
+    terminated = conn.terminate_instances([instance.id,])
+    print "Stopped instance %s" % terminated
+
+  def __init__(self, cluster):
+    super(Ec2Storage, self).__init__(cluster)
+    self.config_dir = cluster.config_dir
+
+  def _get_storage_filename(self):
+    return os.path.join(self.config_dir,
+                        "ec2-storage-%s.json" % (self.cluster.name))
+
+  def create(self, role, number_of_instances, availability_zone, spec_filename):
+    spec_file = open(spec_filename, 'r')
+    volume_spec_manager = JsonVolumeSpecManager(spec_file)
+    volume_manager = JsonVolumeManager(self._get_storage_filename())
+    for dummy in range(number_of_instances):
+      mountable_volumes = []
+      volume_specs = volume_spec_manager.volume_specs_for_role(role)
+      for spec in volume_specs:
+        logger.info("Creating volume of size %s in %s from snapshot %s" % \
+                    (spec.size, availability_zone, spec.snapshot_id))
+        volume = self.cluster.ec2Connection.create_volume(spec.size,
+                                                          availability_zone,
+                                                          spec.snapshot_id)
+        mountable_volumes.append(MountableVolume(volume.id, spec.mount_point,
+                                                 spec.device))
+      volume_manager.add_instance_storage_for_role(role, mountable_volumes)
+
+  def _get_mountable_volumes(self, role):
+    storage_filename = self._get_storage_filename()
+    volume_manager = JsonVolumeManager(storage_filename)
+    return volume_manager.get_instance_storage_for_role(role)
+
+  def get_mappings_string_for_role(self, role):
+    mappings = {}
+    mountable_volumes_list = self._get_mountable_volumes(role)
+    for mountable_volumes in mountable_volumes_list:
+      for mountable_volume in mountable_volumes:
+        mappings[mountable_volume.mount_point] = mountable_volume.device
+    return ";".join(["%s,%s" % (mount_point, device) for (mount_point, device)
+                     in mappings.items()])
+
+  def _has_storage(self, role):
+    return self._get_mountable_volumes(role)
+
+  def has_any_storage(self, roles):
+    for role in roles:
+      if self._has_storage(role):
+        return True
+    return False
+
+  def _get_ec2_volumes_dict(self, mountable_volumes):
+    volume_ids = [mv.volume_id for mv in sum(mountable_volumes, [])]
+    volumes = self.cluster.ec2Connection.get_all_volumes(volume_ids)
+    volumes_dict = {}
+    for volume in volumes:
+      volumes_dict[volume.id] = volume
+    return volumes_dict
+
+  def _print_volume(self, role, volume):
+    print "\t".join((role, volume.id, str(volume.size),
+                     volume.snapshot_id, volume.availabilityZone,
+                     volume.status, str(volume.create_time),
+                     str(volume.attach_time)))
+
+  def print_status(self, roles):
+    for role in roles:
+      mountable_volumes_list = self._get_mountable_volumes(role)
+      ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+      for mountable_volumes in mountable_volumes_list:
+        for mountable_volume in mountable_volumes:
+          self._print_volume(role, ec2_volumes[mountable_volume.volume_id])
+
+  def _replace(self, string, replacements):
+    for (match, replacement) in replacements.iteritems():
+      string = string.replace(match, replacement)
+    return string
+
+  def attach(self, role, instances):
+    mountable_volumes_list = self._get_mountable_volumes(role)
+    if not mountable_volumes_list:
+      return
+    ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+
+    available_mountable_volumes_list = []
+
+    available_instances_dict = {}
+    for instance in instances:
+      available_instances_dict[instance.id] = instance
+
+    # Iterate over mountable_volumes and retain those that are not attached
+    # Also maintain a list of instances that have no attached storage
+    # Note that we do not fill in "holes" (instances that only have some of
+    # their storage attached)
+    for mountable_volumes in mountable_volumes_list:
+      available = True
+      for mountable_volume in mountable_volumes:
+        if ec2_volumes[mountable_volume.volume_id].status != 'available':
+          available = False
+          attach_data = ec2_volumes[mountable_volume.volume_id].attach_data
+          instance_id = attach_data.instance_id
+          if available_instances_dict.has_key(instance_id):
+            del available_instances_dict[instance_id]
+      if available:
+        available_mountable_volumes_list.append(mountable_volumes)
+
+    if len(available_instances_dict) != len(available_mountable_volumes_list):
+      logger.warning("Number of available instances (%s) and volumes (%s) \
+        do not match." \
+        % (len(available_instances_dict),
+           len(available_mountable_volumes_list)))
+
+    for (instance, mountable_volumes) in zip(available_instances_dict.values(),
+                                             available_mountable_volumes_list):
+      print "Attaching storage to %s" % instance.id
+      for mountable_volume in mountable_volumes:
+        volume = ec2_volumes[mountable_volume.volume_id]
+        print "Attaching %s to %s" % (volume.id, instance.id)
+        volume.attach(instance.id, mountable_volume.device)
+
+  def delete(self, role):
+    storage_filename = self._get_storage_filename()
+    volume_manager = JsonVolumeManager(storage_filename)
+    mountable_volumes_list = volume_manager.get_instance_storage_for_role(role)
+    ec2_volumes = self._get_ec2_volumes_dict(mountable_volumes_list)
+    all_available = True
+    for volume in ec2_volumes.itervalues():
+      if volume.status != 'available':
+        all_available = False
+        logger.warning("Volume %s is not available.", volume)
+    if not all_available:
+      logger.warning("Some volumes are still in use for role %s.\
+        Aborting delete.", role)
+      return
+    for volume in ec2_volumes.itervalues():
+      volume.delete()
+    volume_manager.remove_instance_storage_for_role(role)

+ 163 - 0
src/contrib/cloud/src/py/hadoop/cloud/storage.py

@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Classes for controlling external cluster storage.
+"""
+
+import logging
+import simplejson as json
+
+logger = logging.getLogger(__name__)
+
+class VolumeSpec(object):
+  """
+  The specification for a storage volume, encapsulating all the information
+  needed to create a volume and ultimately mount it on an instance.
+  """
+  def __init__(self, size, mount_point, device, snapshot_id):
+    self.size = size
+    self.mount_point = mount_point
+    self.device = device
+    self.snapshot_id = snapshot_id
+
+
+class JsonVolumeSpecManager(object):
+  """
+  A container for VolumeSpecs. This object can read VolumeSpecs specified in
+  JSON.
+  """
+  def __init__(self, spec_file):
+    self.spec = json.load(spec_file)
+
+  def volume_specs_for_role(self, role):
+    return [VolumeSpec(d["size_gb"], d["mount_point"], d["device"],
+                       d["snapshot_id"]) for d in self.spec[role]]
+
+  def get_mappings_string_for_role(self, role):
+    """
+    Returns a short string of the form
+    "mount_point1,device1;mount_point2,device2;..."
+    which is useful for passing as an environment variable.
+    """
+    return ";".join(["%s,%s" % (d["mount_point"], d["device"])
+                     for d in self.spec[role]])
+
+
+class MountableVolume(object):
+  """
+  A storage volume that has been created. It may or may not have been attached
+  or mounted to an instance.
+  """
+  def __init__(self, volume_id, mount_point, device):
+    self.volume_id = volume_id
+    self.mount_point = mount_point
+    self.device = device
+
+
+class JsonVolumeManager(object):
+
+  def __init__(self, filename):
+    self.filename = filename
+
+  def _load(self):
+    try:
+      return json.load(open(self.filename, "r"))
+    except IOError:
+      logger.debug("File %s does not exist.", self.filename)
+      return {}
+
+  def _store(self, obj):
+    return json.dump(obj, open(self.filename, "w"), sort_keys=True, indent=2)
+
+  def add_instance_storage_for_role(self, role, mountable_volumes):
+    json_dict = self._load()
+    mv_dicts = [mv.__dict__ for mv in mountable_volumes]
+    json_dict.setdefault(role, []).append(mv_dicts)
+    self._store(json_dict)
+
+  def remove_instance_storage_for_role(self, role):
+    json_dict = self._load()
+    del json_dict[role]
+    self._store(json_dict)
+
+  def get_instance_storage_for_role(self, role):
+    """
+    Returns a list of lists of MountableVolume objects. Each nested list is
+    the storage for one instance.
+    """
+    try:
+      json_dict = self._load()
+      instance_storage = []
+      for instance in json_dict[role]:
+        vols = []
+        for vol in instance:
+          vols.append(MountableVolume(vol["volume_id"], vol["mount_point"],
+                                      vol["device"]))
+        instance_storage.append(vols)
+      return instance_storage
+    except KeyError:
+      return []
+
+class Storage(object):
+  """
+  Storage volumes for a cluster. The storage is associated with a named
+  cluster. Many clusters just have local storage, in which case this is
+  not used.
+  """
+
+  def __init__(self, cluster):
+    self.cluster = cluster
+
+  def create(self, role, number_of_instances, availability_zone, spec_filename):
+    """
+    Create new storage volumes for instances with the given role, according to
+    the mapping defined in the spec file.
+    """
+    pass
+
+  def get_mappings_string_for_role(self, role):
+    """
+    Returns a short string of the form
+    "mount_point1,device1;mount_point2,device2;..."
+    which is useful for passing as an environment variable.
+    """
+    raise Exception("Unimplemented")
+
+  def has_any_storage(self, roles):
+    """
+    Return True if any of the given roles has associated storage
+    """
+    return False
+
+  def print_status(self, roles):
+    """
+    Print the status of storage volumes for the given roles.
+    """
+    pass
+
+  def attach(self, role, instances):
+    """
+    Attach volumes for a role to instances. Some volumes may already be
+    attached, in which case they are ignored, and we take care not to attach
+    multiple volumes to an instance.
+    """
+    pass
+
+  def delete(self, role):
+    """
+    Permanently delete all the storage for a role.
+    """
+    pass

+ 84 - 0
src/contrib/cloud/src/py/hadoop/cloud/util.py

@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Utility functions.
+"""
+
+import ConfigParser
+import socket
+import urllib2
+
+def bash_quote(text):
+  """Quotes a string for bash, by using single quotes."""
+  if text == None:
+    return ""
+  return "'%s'" % text.replace("'", "'\\''")
+
+def bash_quote_env(env):
+  """Quotes the value in an environment variable assignment."""
+  if env.find("=") == -1:
+    return env
+  (var, value) = env.split("=")
+  return "%s=%s" % (var, bash_quote(value))
+
+def build_env_string(env_strings=[], pairs={}):
+  """Build a bash environment variable assignment"""
+  env = ''
+  if env_strings:
+    for env_string in env_strings:
+      env += "%s " % bash_quote_env(env_string)
+  if pairs:
+    for key, val in pairs.items():
+      env += "%s=%s " % (key, bash_quote(val))
+  return env[:-1]
+
+def merge_config_with_options(section_name, config, options):
+  """
+  Merge configuration options with a dictionary of options.
+  Keys in the options dictionary take precedence.
+  """
+  res = {}
+  try:
+    for (key, value) in config.items(section_name):
+      if value.find("\n") != -1:
+        res[key] = value.split("\n")
+      else:
+        res[key] = value
+  except ConfigParser.NoSectionError:
+    pass
+  for key in options:
+    if options[key] != None:
+      res[key] = options[key]
+  return res
+
+def url_get(url, timeout=10, retries=0):
+  """
+  Retrieve content from the given URL.
+  """
+   # in Python 2.6 we can pass timeout to urllib2.urlopen
+  socket.setdefaulttimeout(timeout)
+  attempts = 0
+  while True:
+    try:
+      return urllib2.urlopen(url).read()
+    except urllib2.URLError:
+      attempts = attempts + 1
+      if attempts > retries:
+        raise
+
+def xstr(string):
+  """Sane string conversion: return an empty string if string is None."""
+  return '' if string is None else str(string)

+ 14 - 0
src/contrib/cloud/src/test/hadoop/__init__.py

@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+ 14 - 0
src/contrib/cloud/src/test/hadoop/cloud/__init__.py

@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+ 36 - 0
src/contrib/cloud/src/test/hadoop/cloud/alltests.py

@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import unittest
+from hadoop.cloud.testcluster import TestCluster
+from hadoop.cloud.teststorage import TestJsonVolumeSpecManager
+from hadoop.cloud.teststorage import TestJsonVolumeManager
+from hadoop.cloud.testuserdata import TestInstanceUserData
+from hadoop.cloud.testutil import TestUtilFunctions
+
+def testSuite():
+  alltests = unittest.TestSuite([
+    unittest.makeSuite(TestCluster, 'test'),
+    unittest.makeSuite(TestJsonVolumeSpecManager, 'test'),
+    unittest.makeSuite(TestJsonVolumeManager, 'test'),
+    unittest.makeSuite(TestInstanceUserData, 'test'),
+    unittest.makeSuite(TestUtilFunctions, 'test'),
+  ])
+  return alltests
+
+if __name__ == "__main__":
+  runner = unittest.TextTestRunner()
+  sys.exit(not runner.run(testSuite()).wasSuccessful())

+ 37 - 0
src/contrib/cloud/src/test/hadoop/cloud/testcluster.py

@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from hadoop.cloud.cluster import RoleSyntaxException
+from hadoop.cloud.providers.ec2 import Ec2Cluster
+
+class TestCluster(unittest.TestCase):
+
+  def test_group_name_for_role(self):
+    cluster = Ec2Cluster("test-cluster", None)
+    self.assertEqual("test-cluster-foo", cluster._group_name_for_role("foo"))
+
+  def test_check_role_name_valid(self):
+    cluster = Ec2Cluster("test-cluster", None)
+    cluster._check_role_name(
+      "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_")
+
+  def test_check_role_name_dash_is_invalid(self):
+    cluster = Ec2Cluster("test-cluster", None)
+    self.assertRaises(RoleSyntaxException, cluster._check_role_name, "a-b")
+
+if __name__ == '__main__':
+  unittest.main()

+ 137 - 0
src/contrib/cloud/src/test/hadoop/cloud/teststorage.py

@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import unittest
+
+import simplejson as json
+from StringIO import StringIO
+
+from hadoop.cloud.storage import MountableVolume
+from hadoop.cloud.storage import JsonVolumeManager
+from hadoop.cloud.storage import JsonVolumeSpecManager
+
+spec = {
+ "master": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj",
+             "snapshot_id": "snap_1"},
+            ),
+ "slave": ({"size_gb":"8", "mount_point":"/", "device":"/dev/sdj",
+            "snapshot_id": "snap_2"},
+           {"size_gb":"10", "mount_point":"/data1", "device":"/dev/sdk",
+            "snapshot_id": "snap_3"},
+           )
+ }
+
+class TestJsonVolumeSpecManager(unittest.TestCase):
+
+  def test_volume_specs_for_role(self):
+
+    input = StringIO(json.dumps(spec))
+
+    volume_spec_manager = JsonVolumeSpecManager(input)
+
+    master_specs = volume_spec_manager.volume_specs_for_role("master")
+    self.assertEqual(1, len(master_specs))
+    self.assertEqual("/", master_specs[0].mount_point)
+    self.assertEqual("8", master_specs[0].size)
+    self.assertEqual("/dev/sdj", master_specs[0].device)
+    self.assertEqual("snap_1", master_specs[0].snapshot_id)
+
+    slave_specs = volume_spec_manager.volume_specs_for_role("slave")
+    self.assertEqual(2, len(slave_specs))
+    self.assertEqual("snap_2", slave_specs[0].snapshot_id)
+    self.assertEqual("snap_3", slave_specs[1].snapshot_id)
+
+    self.assertRaises(KeyError, volume_spec_manager.volume_specs_for_role,
+                      "no-such-role")
+
+  def test_get_mappings_string_for_role(self):
+
+    input = StringIO(json.dumps(spec))
+
+    volume_spec_manager = JsonVolumeSpecManager(input)
+
+    master_mappings = volume_spec_manager.get_mappings_string_for_role("master")
+    self.assertEqual("/,/dev/sdj", master_mappings)
+
+    slave_mappings = volume_spec_manager.get_mappings_string_for_role("slave")
+    self.assertEqual("/,/dev/sdj;/data1,/dev/sdk", slave_mappings)
+
+    self.assertRaises(KeyError,
+                      volume_spec_manager.get_mappings_string_for_role,
+                      "no-such-role")
+
+class TestJsonVolumeManager(unittest.TestCase):
+
+  def setUp(self):
+    try:
+      os.remove("volumemanagertest.json")
+    except OSError:
+      pass
+
+  def test_add_instance_storage_for_role(self):
+    volume_manager = JsonVolumeManager("volumemanagertest.json")
+    self.assertEqual(0,
+      len(volume_manager.get_instance_storage_for_role("master")))
+
+    volume_manager.add_instance_storage_for_role("master",
+                                                 [MountableVolume("vol_1", "/",
+                                                                  "/dev/sdj")])
+    master_storage = volume_manager.get_instance_storage_for_role("master")
+    self.assertEqual(1, len(master_storage))
+    master_storage_instance0 = master_storage[0]
+    self.assertEqual(1, len(master_storage_instance0))
+    master_storage_instance0_vol0 = master_storage_instance0[0]
+    self.assertEqual("vol_1", master_storage_instance0_vol0.volume_id)
+    self.assertEqual("/", master_storage_instance0_vol0.mount_point)
+    self.assertEqual("/dev/sdj", master_storage_instance0_vol0.device)
+
+    volume_manager.add_instance_storage_for_role("slave",
+                                                 [MountableVolume("vol_2", "/",
+                                                                  "/dev/sdj")])
+    self.assertEqual(1,
+      len(volume_manager.get_instance_storage_for_role("master")))
+    slave_storage = volume_manager.get_instance_storage_for_role("slave")
+    self.assertEqual(1, len(slave_storage))
+    slave_storage_instance0 = slave_storage[0]
+    self.assertEqual(1, len(slave_storage_instance0))
+    slave_storage_instance0_vol0 = slave_storage_instance0[0]
+    self.assertEqual("vol_2", slave_storage_instance0_vol0.volume_id)
+    self.assertEqual("/", slave_storage_instance0_vol0.mount_point)
+    self.assertEqual("/dev/sdj", slave_storage_instance0_vol0.device)
+
+    volume_manager.add_instance_storage_for_role("slave",
+      [MountableVolume("vol_3", "/", "/dev/sdj"),
+       MountableVolume("vol_4", "/data1", "/dev/sdk")])
+    self.assertEqual(1,
+      len(volume_manager.get_instance_storage_for_role("master")))
+    slave_storage = volume_manager.get_instance_storage_for_role("slave")
+    self.assertEqual(2, len(slave_storage))
+    slave_storage_instance0 = slave_storage[0]
+    slave_storage_instance1 = slave_storage[1]
+    self.assertEqual(1, len(slave_storage_instance0))
+    self.assertEqual(2, len(slave_storage_instance1))
+    slave_storage_instance1_vol0 = slave_storage_instance1[0]
+    slave_storage_instance1_vol1 = slave_storage_instance1[1]
+    self.assertEqual("vol_3", slave_storage_instance1_vol0.volume_id)
+    self.assertEqual("/", slave_storage_instance1_vol0.mount_point)
+    self.assertEqual("/dev/sdj", slave_storage_instance1_vol0.device)
+    self.assertEqual("vol_4", slave_storage_instance1_vol1.volume_id)
+    self.assertEqual("/data1", slave_storage_instance1_vol1.mount_point)
+    self.assertEqual("/dev/sdk", slave_storage_instance1_vol1.device)
+
+
+if __name__ == '__main__':
+  unittest.main()

+ 44 - 0
src/contrib/cloud/src/test/hadoop/cloud/testuserdata.py

@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import tempfile
+import unittest
+
+from hadoop.cloud.cluster import InstanceUserData
+
+class TestInstanceUserData(unittest.TestCase):
+
+  def test_replacement(self):
+    file = tempfile.NamedTemporaryFile()
+    file.write("Contents go here")
+    file.flush()
+    self.assertEqual("Contents go here",
+                     InstanceUserData(file.name, {}).read())
+    self.assertEqual("Contents were here",
+                     InstanceUserData(file.name, { "go": "were"}).read())
+    self.assertEqual("Contents  here",
+                     InstanceUserData(file.name, { "go": None}).read())
+    file.close()
+
+  def test_read_file_url(self):
+    file = tempfile.NamedTemporaryFile()
+    file.write("Contents go here")
+    file.flush()
+    self.assertEqual("Contents go here",
+                     InstanceUserData("file://%s" % file.name, {}).read())
+    file.close()
+
+if __name__ == '__main__':
+  unittest.main()

+ 81 - 0
src/contrib/cloud/src/test/hadoop/cloud/testutil.py

@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ConfigParser
+import StringIO
+import unittest
+
+from hadoop.cloud.util import bash_quote
+from hadoop.cloud.util import bash_quote_env
+from hadoop.cloud.util import build_env_string
+from hadoop.cloud.util import merge_config_with_options
+from hadoop.cloud.util import xstr
+
+class TestUtilFunctions(unittest.TestCase):
+
+  def test_bash_quote(self):
+    self.assertEqual("", bash_quote(None))
+    self.assertEqual("''", bash_quote(""))
+    self.assertEqual("'a'", bash_quote("a"))
+    self.assertEqual("'a b'", bash_quote("a b"))
+    self.assertEqual("'a\b'", bash_quote("a\b"))
+    self.assertEqual("'a '\\'' b'", bash_quote("a ' b"))
+
+  def test_bash_quote_env(self):
+    self.assertEqual("", bash_quote_env(""))
+    self.assertEqual("a", bash_quote_env("a"))
+    self.assertEqual("a='b'", bash_quote_env("a=b"))
+    self.assertEqual("a='b c'", bash_quote_env("a=b c"))
+    self.assertEqual("a='b\c'", bash_quote_env("a=b\c"))
+    self.assertEqual("a='b '\\'' c'", bash_quote_env("a=b ' c"))
+
+  def test_build_env_string(self):
+    self.assertEqual("", build_env_string())
+    self.assertEqual("a='b' c='d'",
+                     build_env_string(env_strings=["a=b", "c=d"]))
+    self.assertEqual("a='b' c='d'",
+                     build_env_string(pairs={"a": "b", "c": "d"}))
+
+  def test_merge_config_with_options(self):
+    options = { "a": "b" }
+    config = ConfigParser.ConfigParser()
+    self.assertEqual({ "a": "b" },
+                     merge_config_with_options("section", config, options))
+    config.add_section("section")
+    self.assertEqual({ "a": "b" },
+                     merge_config_with_options("section", config, options))
+    config.set("section", "a", "z")
+    config.set("section", "c", "d")
+    self.assertEqual({ "a": "z", "c": "d" },
+                     merge_config_with_options("section", config, {}))
+    self.assertEqual({ "a": "b", "c": "d" },
+                     merge_config_with_options("section", config, options))
+
+  def test_merge_config_with_options_list(self):
+    config = ConfigParser.ConfigParser()
+    config.readfp(StringIO.StringIO("""[section]
+env1=a=b
+ c=d
+env2=e=f
+ g=h"""))
+    self.assertEqual({ "env1": ["a=b", "c=d"], "env2": ["e=f", "g=h"] },
+                     merge_config_with_options("section", config, {}))
+
+  def test_xstr(self):
+    self.assertEqual("", xstr(None))
+    self.assertEqual("a", xstr("a"))
+
+if __name__ == '__main__':
+  unittest.main()