Bläddra i källkod

AMBARI-18940. Perf: Fix bugs in deploy-gce-perf-cluster.py to generate correct config file (alejandro)

Alejandro Fernandez 8 år sedan
förälder
incheckning
52347d2e4e
1 ändrade filer med 145 tillägg och 85 borttagningar
  1. 145 85
      contrib/utils/perf/deploy-gce-perf-cluster.py

+ 145 - 85
contrib/utils/perf/deploy-gce-perf-cluster.py

@@ -27,18 +27,20 @@ import traceback
 import re
 import re
 import socket
 import socket
 
 
-cluster_name="perf-cluster"
-ambari_repo_file_url="http://s3.amazonaws.com/dev.hortonworks.com/ambari/centos6/2.x/latest/trunk/ambaribn.repo"
+cluster_prefix = "perf"
+ambari_repo_file_url = "http://s3.amazonaws.com/dev.hortonworks.com/ambari/centos6/2.x/latest/trunk/ambaribn.repo"
 
 
-public_hostname_script="foo"
-hostname_script="foo"
+public_hostname_script = "foo"
+hostname_script = "foo"
 
 
-start_number=1
-number_of_agents_on_host=50
+NUMBER_OF_AGENTS_ON_HOST = 50
 
 
 
 
 class SSH:
 class SSH:
-  """ Ssh implementation of this """
+  """
+  Ssh implementation of this
+  """
+
   def __init__(self, user, sshkey_file, host, command, custom_option='', errorMessage = None):
   def __init__(self, user, sshkey_file, host, command, custom_option='', errorMessage = None):
     self.user = user
     self.user = user
     self.sshkey_file = sshkey_file
     self.sshkey_file = sshkey_file
@@ -46,8 +48,6 @@ class SSH:
     self.command = command
     self.command = command
     self.errorMessage = errorMessage
     self.errorMessage = errorMessage
     self.custom_option = custom_option
     self.custom_option = custom_option
-    pass
-
 
 
   def run(self):
   def run(self):
     sshcommand = ["ssh",
     sshcommand = ["ssh",
@@ -91,12 +91,15 @@ class SSH:
 
 
     print "SSH command execution finished"
     print "SSH command execution finished"
 
 
-    return  {"exitstatus": sshstat.returncode, "log": log, "errormsg": errorMsg}
+    return {"exitstatus": sshstat.returncode, "log": log, "errormsg": errorMsg}
 
 
 
 
 class SCP:
 class SCP:
-  """ SCP implementation that is thread based. The status can be returned using
-   status val """
+  """
+  SCP implementation that is thread based. The status can be returned using
+  status val
+  """
+
   def __init__(self, user, sshkey_file, host, inputFile, remote, errorMessage = None):
   def __init__(self, user, sshkey_file, host, inputFile, remote, errorMessage = None):
     self.user = user
     self.user = user
     self.sshkey_file = sshkey_file
     self.sshkey_file = sshkey_file
@@ -104,8 +107,6 @@ class SCP:
     self.inputFile = inputFile
     self.inputFile = inputFile
     self.remote = remote
     self.remote = remote
     self.errorMessage = errorMessage
     self.errorMessage = errorMessage
-    pass
-
 
 
   def run(self):
   def run(self):
     scpcommand = ["scp",
     scpcommand = ["scp",
@@ -115,7 +116,6 @@ class SCP:
                   "-o", "StrictHostKeyChecking=no",
                   "-o", "StrictHostKeyChecking=no",
                   "-i", self.sshkey_file, self.inputFile, self.user + "@" +
                   "-i", self.sshkey_file, self.inputFile, self.user + "@" +
                                                           self.host + ":" + self.remote]
                                                           self.host + ":" + self.remote]
-
     i = 1
     i = 1
     while True:
     while True:
       try:
       try:
@@ -162,27 +162,33 @@ def main():
   parser.add_argument('--key', type=str,
   parser.add_argument('--key', type=str,
                       action='store', help='Path to GCE ssh key.')
                       action='store', help='Path to GCE ssh key.')
 
 
-  parser.add_argument('--cluster-prefix', type=str,
-                      action='store', help='Cluster name prefix.')
+  parser.add_argument('--cluster-suffix', type=str,
+                      action='store', help='Cluster name suffix.')
 
 
   parser.add_argument('--agent-prefix', type=str,
   parser.add_argument('--agent-prefix', type=str,
                       action='store', help='Agent name prefix.')
                       action='store', help='Agent name prefix.')
 
 
   parser.add_argument('--agents-count', type=int,
   parser.add_argument('--agents-count', type=int,
-                      action='store', help='Agents count for whole cluster(multiples of 50).')
+                      action='store', help='Agents count for whole cluster (multiples of 50).')
 
 
   if len(sys.argv) <= 1:
   if len(sys.argv) <= 1:
     parser.print_help()
     parser.print_help()
-  else:
-    args = parser.parse_args()
-    do_work(args)
+    sys.exit(-1)
+
+  args = parser.parse_args()
+  do_work(args)
 
 
-# base method which process cluster deployment
 def deploy_cluster(args):
 def deploy_cluster(args):
-  number_of_nodes=args.agents_count/number_of_agents_on_host
+  """
+  Process cluster deployment
+  :param args: Command line args.
+  """
+  # When dividing, need to get the ceil.
+  number_of_nodes = ((args.agents_count - 1) / NUMBER_OF_AGENTS_ON_HOST) + 1
+
   # trying to create cluster with needed params
   # trying to create cluster with needed params
-  print "Creating cluster {0}-{1} with {2} large nodes on centos6...".format(args.cluster_prefix, cluster_name, str(number_of_nodes))
-  execute_command(args, args.controller, "/usr/sbin/gce up {0}-{1} {2} --centos6 --large".format(args.cluster_prefix, cluster_name, str(number_of_nodes)),
+  print "Creating cluster {0}-{1} with {2} large nodes on centos6...".format(cluster_prefix, args.cluster_suffix, str(number_of_nodes))
+  execute_command(args, args.controller, "/usr/sbin/gce up {0}-{1} {2} --centos6 --large".format(cluster_prefix, args.cluster_suffix, str(number_of_nodes)),
                   "Failed to create cluster, probably not enough resources!", "-tt")
                   "Failed to create cluster, probably not enough resources!", "-tt")
 
 
   # VMs are not accessible immediately
   # VMs are not accessible immediately
@@ -190,19 +196,20 @@ def deploy_cluster(args):
 
 
   # getting list of vms information like hostname and ip address
   # getting list of vms information like hostname and ip address
   print "Getting list of virtual machines from cluster..."
   print "Getting list of virtual machines from cluster..."
+  # Dictionary from host name to IP
   vms = get_vms_list(args)
   vms = get_vms_list(args)
 
 
   # check number of nodes in cluster to be the same as user asked
   # check number of nodes in cluster to be the same as user asked
   print "Checking count of created nodes in cluster..."
   print "Checking count of created nodes in cluster..."
   if not vms or len(vms) < number_of_nodes:
   if not vms or len(vms) < number_of_nodes:
-    raise Exception("Can not bring up enough nodes. Requested {0}, but got: {1}. Probably not enough resources!".format(number_of_nodes, len(vms)))
+    raise Exception("Cannot bring up enough nodes. Requested {0}, but got {1}. Probably not enough resources!".format(number_of_nodes, len(vms)))
 
 
   print "GCE cluster was successfully created!"
   print "GCE cluster was successfully created!"
   pretty_print_vms(vms)
   pretty_print_vms(vms)
 
 
   # installing/starting ambari-server and ambari-agents on each host
   # installing/starting ambari-server and ambari-agents on each host
   server_host_name = sorted(vms.items())[0][0]
   server_host_name = sorted(vms.items())[0][0]
-  server_installed=False
+  server_installed = False
 
 
   print "Creating server.sh script (which will be executed on server to install/configure/start ambari-server and ambari-agent)..."
   print "Creating server.sh script (which will be executed on server to install/configure/start ambari-server and ambari-agent)..."
   create_server_script(args, server_host_name)
   create_server_script(args, server_host_name)
@@ -211,14 +218,33 @@ def deploy_cluster(args):
   create_agent_script(args, server_host_name)
   create_agent_script(args, server_host_name)
 
 
   time.sleep(10)
   time.sleep(10)
+
+  # If the user asks for a number of agents that is not a multiple of 50, then only create how many are needed instead
+  # of 50 on every VM.
+  num_agents_left_to_create = args.agents_count
+
+  start_num = 1
   for (hostname, ip) in sorted(vms.items()):
   for (hostname, ip) in sorted(vms.items()):
+    num_agents_on_this_host = min(num_agents_left_to_create, NUMBER_OF_AGENTS_ON_HOST)
+
     print "=========================="
     print "=========================="
-    print "Working on {0}".format(hostname)
+    print "Working on VM {0} that will contain hosts %d - %d".format(hostname, start_num, start_num + num_agents_on_this_host - 1)
+
+    # The agent multiplier config will be different on each VM.
+
+    cmd_generate_multiplier_conf = "mkdir -p /etc/ambari-agent/conf/ ; printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf".format(start_num, num_agents_on_this_host, args.agent_prefix)
+    start_num += num_agents_on_this_host
+    num_agents_left_to_create -= num_agents_on_this_host
+
     if not server_installed:
     if not server_installed:
       remote_path = "/server.sh"
       remote_path = "/server.sh"
       local_path = "server.sh"
       local_path = "server.sh"
       print "Copying server.sh to {0}...".format(hostname)
       print "Copying server.sh to {0}...".format(hostname)
       put_file(args, ip, local_path, remote_path, "Failed to copy file!")
       put_file(args, ip, local_path, remote_path, "Failed to copy file!")
+
+      print "Generating agent-multiplier.conf"
+      execute_command(args, ip, cmd_generate_multiplier_conf, "Failed to generate agent-multiplier.conf on host {0}".format(hostname))
+
       print "Executing remote ssh command (set correct permissions and start executing server.sh in separate process) on {0}...".format(hostname)
       print "Executing remote ssh command (set correct permissions and start executing server.sh in separate process) on {0}...".format(hostname)
       execute_command(args, ip, "cd /; chmod 777 server.sh; nohup ./server.sh >/server.log 2>&1 &",
       execute_command(args, ip, "cd /; chmod 777 server.sh; nohup ./server.sh >/server.log 2>&1 &",
                     "Install/configure/start server script failed!")
                     "Install/configure/start server script failed!")
@@ -228,6 +254,10 @@ def deploy_cluster(args):
       local_path = "agent.sh"
       local_path = "agent.sh"
       print "Copying agent.sh to {0}...".format(hostname)
       print "Copying agent.sh to {0}...".format(hostname)
       put_file(args, ip, local_path, remote_path, "Failed to copy file!")
       put_file(args, ip, local_path, remote_path, "Failed to copy file!")
+
+      print "Generating agent-multiplier.conf"
+      execute_command(args, ip, cmd_generate_multiplier_conf, "Failed to generate agent-multiplier.conf on host {0}".format(hostname))
+
       print "Executing remote ssh command (set correct permissions and start executing agent.sh in separate process) on {0}...".format(hostname)
       print "Executing remote ssh command (set correct permissions and start executing agent.sh in separate process) on {0}...".format(hostname)
       execute_command(args, ip, "cd /; chmod 777 agent.sh; nohup ./agent.sh >/agent.log 2>&1 &",
       execute_command(args, ip, "cd /; chmod 777 agent.sh; nohup ./agent.sh >/agent.log 2>&1 &",
                     "Install/configure start agent script failed!")
                     "Install/configure start agent script failed!")
@@ -235,75 +265,90 @@ def deploy_cluster(args):
   print "All scripts where successfully copied and started on all hosts. " \
   print "All scripts where successfully copied and started on all hosts. " \
         "\nPay attention that server.sh script need 5 minutes to finish and agent.sh need 3 minutes!"
         "\nPay attention that server.sh script need 5 minutes to finish and agent.sh need 3 minutes!"
 
 
-# check if all required params were passed by user
-# if all needed params available then start cluster deploy
+
 def do_work(args):
 def do_work(args):
+  """
+  Check that all required args are passed in. If so, deploy the cluster.
+  :param args: Command line args
+  """
   if not args.controller:
   if not args.controller:
-    raise Failure("GCE controller ip address not defined!")
+    raise Exception("GCE controller ip address is not defined!")
 
 
   if not args.key:
   if not args.key:
-    raise Failure("Path to gce ssh key not defined!")
+    raise Exception("Path to gce ssh key is not defined!")
 
 
-  if not args.cluster_prefix:
-    raise Failure("Cluster name prefix not defined!")
+  if not args.cluster_suffix:
+    raise Exception("Cluster name suffix is not defined!")
 
 
   if not args.agent_prefix:
   if not args.agent_prefix:
-    raise Failure("Agent name prefix not defined!")
+    raise Exception("Agent name prefix is not defined!")
 
 
   if not args.agents_count:
   if not args.agents_count:
-    raise Failure("Agents count for whole cluster(multiples of 50) not defined!")
+    raise Exception("Agents count for whole cluster is not defined (will put 50 Agents per VM)!")
 
 
   deploy_cluster(args)
   deploy_cluster(args)
 
 
-# creating server.sh script in the same dir where current script is located
-# server.sh script will install, configure and start ambari-server and ambari-agent on host
-def create_server_script(args, server_host_name):
-  file = open("server.sh", "w")
-
-  file.write("#!/bin/bash\n")
-
-  file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url))
-  file.write("yum clean all; yum install git ambari-server ambari-agent -y\n")
-  file.write("cd /home; git clone https://github.com/apache/ambari.git\n")
-
-  file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-server/resources/stacks/PERF\n")
-  file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n")
-
-  file.write("ambari-server setup -s\n")
-  file.write("sed -i -e 's/false/true/g' /var/lib/ambari-server/resources/stacks/PERF/1.0/metainfo.xml\n")
-  file.write("ambari-server start --skip-database-check\n")
 
 
-  file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name))
-  file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script))
-  file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix))
-  file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n")
-
-  file.write("exit 0")
+def create_server_script(args, server_host_name):
+  """
+  Creating server.sh script in the same dir where current script is located
+  server.sh script will install, configure and start ambari-server and ambari-agent on host
+  :param args: Command line args
+  :param server_host_name: Server host name
+  """
+
+  contents = "#!/bin/bash\n" + \
+  "wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url) + \
+  "yum clean all; yum install git ambari-server ambari-agent -y\n" + \
+  "cd /home; git clone https://github.com/apache/ambari.git\n" + \
+  "cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-server/resources/stacks/PERF\n" + \
+  "cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n" + \
+  "ambari-server setup -s\n" + \
+  "sed -i -e 's/false/true/g' /var/lib/ambari-server/resources/stacks/PERF/1.0/metainfo.xml\n" + \
+  "ambari-server start --skip-database-check\n" + \
+  "sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name) + \
+  "sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script) + \
+  "python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n" + \
+  "exit 0"
+
+  with open("server.sh", "w") as f:
+    f.write(contents)
 
 
-  file.close()
 
 
-# creating agent.sh script in the same dir where current script is located
-# agent.sh script will install, configure and start ambari-agent on host
 def create_agent_script(args, server_host_name):
 def create_agent_script(args, server_host_name):
-  file = open("agent.sh", "w")
-
-  file.write("#!/bin/bash\n")
-
-  file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url))
-  file.write("yum clean all; yum install git ambari-agent -y\n")
-  file.write("cd /home; git clone https://github.com/apache/ambari.git\n")
-  file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n")
+  """
+  Creating agent.sh script in the same dir where current script is located
+  agent.sh script will install, configure and start ambari-agent on host
+  :param args: Command line args
+  :param server_host_name: Server host name
+  """
+
+  # TODO, instead of cloning Ambari repo on each VM, do it on the server once and distribute to all of the agents.
+  contents = "#!/bin/bash\n" + \
+  "wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url) + \
+  "yum clean all; yum install git ambari-agent -y\n" + \
+  "cd /home; git clone https://github.com/apache/ambari.git\n" + \
+  "cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n" + \
+  "sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name) + \
+  "sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script) + \
+  "python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n" + \
+  "exit 0"
+
+  with open("agent.sh", "w") as f:
+    f.write(contents)
 
 
-  file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name))
-  file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script))
-  file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix))
-  file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n")
-  file.write("exit 0")
 
 
-  file.close()
-
-# method to execute ssh commands via SSH class
 def execute_command(args, ip, cmd, fail_message, custom_option='', login='root'):
 def execute_command(args, ip, cmd, fail_message, custom_option='', login='root'):
+  """
+  Method to execute ssh commands via SSH class
+  :param args: Command line args
+  :param ip: IP to ssh to
+  :param cmd: Command to execute
+  :param fail_message: In case of an error, what to report
+  :param custom_option: Custom flags
+  :param login: Login user
+  :return: Return execute log message
+  """
   ssh = SSH(login, args.key, ip, cmd, custom_option, fail_message)
   ssh = SSH(login, args.key, ip, cmd, custom_option, fail_message)
   ssh_result = ssh.run()
   ssh_result = ssh.run()
   status_code = ssh_result["exitstatus"]
   status_code = ssh_result["exitstatus"]
@@ -312,8 +357,18 @@ def execute_command(args, ip, cmd, fail_message, custom_option='', login='root')
 
 
   return ssh_result["log"][0]
   return ssh_result["log"][0]
 
 
-# method to copy file from local to remote host via SCP class
+
 def put_file(args, ip, local_file, remote_file, fail_message, login='root'):
 def put_file(args, ip, local_file, remote_file, fail_message, login='root'):
+  """
+  Method to copy file from local to remote host via SCP class
+  :param args: Command line args
+  :param ip: IP to ssh to
+  :param local_file: Path to local file
+  :param remote_file: Path to remote file
+  :param fail_message: In case of an error, what to report
+  :param login: Login user.
+  :return: Return copy log message
+  """
   scp = SCP(login, args.key, ip, local_file,
   scp = SCP(login, args.key, ip, local_file,
             remote_file, fail_message)
             remote_file, fail_message)
   scp_result = scp.run()
   scp_result = scp.run()
@@ -323,10 +378,15 @@ def put_file(args, ip, local_file, remote_file, fail_message, login='root'):
 
 
   return scp_result["log"][0]
   return scp_result["log"][0]
 
 
-# method to parse "gce fqdn {cluster-name}" command output and get hosts and ips pairs for every host in cluster
+
 def get_vms_list(args):
 def get_vms_list(args):
-  gce_fqdb_cmd = '/usr/sbin/gce fqdn {0}-{1}'.format(
-    args.cluster_prefix, cluster_name)
+  """
+  Method to parse "gce fqdn {cluster-name}" command output and get hosts and ips pairs for every host in cluster
+  :param args: Command line args
+  :return: Mapping of VM host name to ip.
+  """
+
+  gce_fqdb_cmd = '/usr/sbin/gce fqdn {0}-{1}'.format(cluster_prefix, args.cluster_suffix)
   out = execute_command(args, args.controller, gce_fqdb_cmd, "Failed to get VMs list!", "-tt")
   out = execute_command(args, args.controller, gce_fqdb_cmd, "Failed to get VMs list!", "-tt")
   lines = out.split('\n')
   lines = out.split('\n')
   #print "LINES=" + str(lines)
   #print "LINES=" + str(lines)
@@ -339,15 +399,15 @@ def get_vms_list(args):
       if match:
       if match:
         result[match.group(2)] = match.group(1)
         result[match.group(2)] = match.group(1)
       else:
       else:
-        raise Exception('Can not parse "{0}"'.format(s))
+        raise Exception('Cannot parse "{0}"'.format(s))
     return result
     return result
   else:
   else:
-    raise Exception('Can not parse "{0}"'.format(lines))
+    raise Exception('Cannot parse "{0}"'.format(lines))
 
 
 
 
 def pretty_print_vms(vms):
 def pretty_print_vms(vms):
   print "----------------------------"
   print "----------------------------"
-  print "server ip: {0}".format(sorted(vms.items())[0][1])
+  print "Server IP: {0}".format(sorted(vms.items())[0][1])
   print "Hostnames of nodes in cluster:"
   print "Hostnames of nodes in cluster:"
   for (hostname, ip) in sorted(vms.items()):
   for (hostname, ip) in sorted(vms.items()):
     print hostname
     print hostname