瀏覽代碼

AMBARI-18740. Perf: Automate deployment of PERF stack on 2500+ Ambari Agents.(vbrodetskyi)

Vitaly Brodetskyi 8 年之前
父節點
當前提交
4c2c50aa4d
共有 1 個文件被更改,包括 360 次插入0 次删除
  1. 360 0
      contrib/utils/perf/deploy-gce-perf-cluster.py

+ 360 - 0
contrib/utils/perf/deploy-gce-perf-cluster.py

@@ -0,0 +1,360 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import argparse
+import os
+import subprocess
+import sys
+import pprint
+import time
+import traceback
+import re
+import socket
+
+cluster_name="perf-cluster"
+ambari_repo_file_url="http://s3.amazonaws.com/dev.hortonworks.com/ambari/centos6/2.x/latest/trunk/ambaribn.repo"
+
+public_hostname_script="foo"
+hostname_script="foo"
+
+start_number=1
+number_of_agents_on_host=50
+
+
+class SSH:
+  """ Ssh implementation of this """
+  def __init__(self, user, sshkey_file, host, command, custom_option='', errorMessage = None):
+    self.user = user
+    self.sshkey_file = sshkey_file
+    self.host = host
+    self.command = command
+    self.errorMessage = errorMessage
+    self.custom_option = custom_option
+    pass
+
+
+  def run(self):
+    sshcommand = ["ssh",
+                  "-o", "ConnectTimeOut=180",
+                  "-o", "StrictHostKeyChecking=no",
+                  "-o", "BatchMode=yes",
+                  self.custom_option,
+                  "-i", self.sshkey_file,
+                  self.user + "@" + self.host, self.command]
+
+    if not self.custom_option:
+      del sshcommand[7]
+
+    i = 1
+    while True:
+      try:
+        sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE,
+                               stderr=subprocess.PIPE)
+        log = sshstat.communicate()
+        if sshstat.returncode != 0:
+          print "Executing SSH command on {0} failed: {1}".format(self.host, log)
+          print "\nRetrying SSH command one more time!"
+          if i >= 3:
+            break
+          i += 1
+          time.sleep(10)
+          continue
+        break
+      except:
+        print "Could not SSH to {0}, waiting for it to start".format(self.host)
+        i += 1
+        time.sleep(10)
+
+    if i >= 3:
+      print "Could not execute remote ssh command: " + ' '.join(sshcommand)
+      raise Exception("Could not connect to {0}. Giving up with erros: {1}".format(self.host, log))
+
+    errorMsg = log[1]
+    if self.errorMessage and sshstat.returncode != 0:
+      errorMsg = self.errorMessage + "\n" + errorMsg
+
+    print "SSH command execution finished"
+
+    return  {"exitstatus": sshstat.returncode, "log": log, "errormsg": errorMsg}
+
+
+class SCP:
+  """ SCP implementation that is thread based. The status can be returned using
+   status val """
+  def __init__(self, user, sshkey_file, host, inputFile, remote, errorMessage = None):
+    self.user = user
+    self.sshkey_file = sshkey_file
+    self.host = host
+    self.inputFile = inputFile
+    self.remote = remote
+    self.errorMessage = errorMessage
+    pass
+
+
+  def run(self):
+    scpcommand = ["scp",
+                  "-r",
+                  "-o", "ConnectTimeout=60",
+                  "-o", "BatchMode=yes",
+                  "-o", "StrictHostKeyChecking=no",
+                  "-i", self.sshkey_file, self.inputFile, self.user + "@" +
+                                                          self.host + ":" + self.remote]
+
+    i = 1
+    while True:
+      try:
+        scpstat = subprocess.Popen(scpcommand, stdout=subprocess.PIPE,
+                                   stderr=subprocess.PIPE)
+        log = scpstat.communicate()
+        if scpstat.returncode != 0:
+          print "Executing SCP command on {0} failed: {1}".format(self.host, log)
+          print "\nRetrying SCP command one more time!"
+          if i >= 3:
+            break
+          i += 1
+          time.sleep(10)
+          continue
+        break
+      except:
+        print "Could not SCP to {0}, waiting for it to start".format(self.host)
+        i += 1
+        time.sleep(10)
+
+      if i >= 3:
+        print "Could not execute remote scp command: " + ' '.join(scpcommand)
+        raise Exception("Could not connect to {0}. Giving up with erros: {1}".format(self.host, log))
+
+    errorMsg = log[1]
+    if self.errorMessage and scpstat.returncode != 0:
+      errorMsg = self.errorMessage + "\n" + errorMsg
+
+    print "SCP command execution finished"
+
+    return {"exitstatus": scpstat.returncode, "log": log, "errormsg": errorMsg}
+
+# main method to parse arguments from user and start work
+def main():
+  parser = argparse.ArgumentParser(
+    description='This script brings up a cluster with ambari installed, configured and started',
+    epilog='Only GCE is supported as of now!'
+  )
+
+  # options
+  parser.add_argument('--controller', type=str,
+                      action='store', help='GCE controller ip address.')
+
+  parser.add_argument('--key', type=str,
+                      action='store', help='Path to GCE ssh key.')
+
+  parser.add_argument('--cluster-prefix', type=str,
+                      action='store', help='Cluster name prefix.')
+
+  parser.add_argument('--agent-prefix', type=str,
+                      action='store', help='Agent name prefix.')
+
+  parser.add_argument('--agents-count', type=int,
+                      action='store', help='Agents count for whole cluster(multiples of 50).')
+
+  if len(sys.argv) <= 1:
+    parser.print_help()
+  else:
+    args = parser.parse_args()
+    do_work(args)
+
+# base method which process cluster deployment
+def deploy_cluster(args):
+  number_of_nodes=args.agents_count/number_of_agents_on_host
+  # trying to create cluster with needed params
+  print "Creating cluster {0}-{1} with {2} large nodes on centos6...".format(args.cluster_prefix, cluster_name, str(number_of_nodes))
+  execute_command(args, args.controller, "/usr/sbin/gce up {0}-{1} {2} --centos6 --large".format(args.cluster_prefix, cluster_name, str(number_of_nodes)),
+                  "Failed to create cluster, probably not enough resources!", "-tt")
+
+  # VMs are not accessible immediately
+  time.sleep(10)
+
+  # getting list of vms information like hostname and ip address
+  print "Getting list of virtual machines from cluster..."
+  vms = get_vms_list(args)
+
+  # check number of nodes in cluster to be the same as user asked
+  print "Checking count of created nodes in cluster..."
+  if not vms or len(vms) < number_of_nodes:
+    raise Exception("Can not bring up enough nodes. Requested {0}, but got: {1}. Probably not enough resources!".format(number_of_nodes, len(vms)))
+
+  print "GCE cluster was successfully created!"
+  pretty_print_vms(vms)
+
+  # installing/starting ambari-server and ambari-agents on each host
+  server_host_name = sorted(vms.items())[0][0]
+  server_installed=False
+
+  print "Creating server.sh script (which will be executed on server to install/configure/start ambari-server and ambari-agent)..."
+  create_server_script(args, server_host_name)
+
+  print "Creating agent.sh script (which will be executed on agent hosts to install/configure/start ambari-agent..."
+  create_agent_script(args, server_host_name)
+
+  time.sleep(10)
+  for (hostname, ip) in sorted(vms.items()):
+    print "=========================="
+    print "Working on {0}".format(hostname)
+    if not server_installed:
+      remote_path = "/server.sh"
+      local_path = "server.sh"
+      print "Copying server.sh to {0}...".format(hostname)
+      put_file(args, ip, local_path, remote_path, "Failed to copy file!")
+      print "Executing remote ssh command (set correct permissions and start executing server.sh in separate process) on {0}...".format(hostname)
+      execute_command(args, ip, "cd /; chmod 777 server.sh; nohup ./server.sh >/server.log 2>&1 &",
+                    "Install/configure/start server script failed!")
+      server_installed = True
+    else:
+      remote_path = "/agent.sh"
+      local_path = "agent.sh"
+      print "Copying agent.sh to {0}...".format(hostname)
+      put_file(args, ip, local_path, remote_path, "Failed to copy file!")
+      print "Executing remote ssh command (set correct permissions and start executing agent.sh in separate process) on {0}...".format(hostname)
+      execute_command(args, ip, "cd /; chmod 777 agent.sh; nohup ./agent.sh >/agent.log 2>&1 &",
+                    "Install/configure start agent script failed!")
+
+  print "All scripts where successfully copied and started on all hosts. " \
+        "\nPay attention that server.sh script need 5 minutes to finish and agent.sh need 3 minutes!"
+
+# check if all required params were passed by user
+# if all needed params available then start cluster deploy
+def do_work(args):
+  if not args.controller:
+    raise Failure("GCE controller ip address not defined!")
+
+  if not args.key:
+    raise Failure("Path to gce ssh key not defined!")
+
+  if not args.cluster_prefix:
+    raise Failure("Cluster name prefix not defined!")
+
+  if not args.agent_prefix:
+    raise Failure("Agent name prefix not defined!")
+
+  if not args.agents_count:
+    raise Failure("Agents count for whole cluster(multiples of 50) not defined!")
+
+  deploy_cluster(args)
+
+# creating server.sh script in the same dir where current script is located
+# server.sh script will install, configure and start ambari-server and ambari-agent on host
+def create_server_script(args, server_host_name):
+  file = open("server.sh", "w")
+
+  file.write("#!/bin/bash\n")
+
+  file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url))
+  file.write("yum clean all; yum install git ambari-server ambari-agent -y\n")
+  file.write("cd /home; git clone https://github.com/apache/ambari.git\n")
+
+  file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-server/resources/stacks/PERF\n")
+  file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n")
+
+  file.write("ambari-server setup -s\n")
+  file.write("sed -i -e 's/false/true/g' /var/lib/ambari-server/resources/stacks/PERF/1.0/metainfo.xml\n")
+  file.write("ambari-server start --skip-database-check\n")
+
+  file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name))
+  file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script))
+  file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix))
+  file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n")
+
+  file.write("exit 0")
+
+  file.close()
+
+# creating agent.sh script in the same dir where current script is located
+# agent.sh script will install, configure and start ambari-agent on host
+def create_agent_script(args, server_host_name):
+  file = open("agent.sh", "w")
+
+  file.write("#!/bin/bash\n")
+
+  file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url))
+  file.write("yum clean all; yum install git ambari-agent -y\n")
+  file.write("cd /home; git clone https://github.com/apache/ambari.git\n")
+  file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n")
+
+  file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name))
+  file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script))
+  file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix))
+  file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n")
+  file.write("exit 0")
+
+  file.close()
+
+# method to execute ssh commands via SSH class
+def execute_command(args, ip, cmd, fail_message, custom_option='', login='root'):
+  ssh = SSH(login, args.key, ip, cmd, custom_option, fail_message)
+  ssh_result = ssh.run()
+  status_code = ssh_result["exitstatus"]
+  if status_code != 0:
+    raise Exception(ssh_result["errormsg"])
+
+  return ssh_result["log"][0]
+
+# method to copy file from local to remote host via SCP class
+def put_file(args, ip, local_file, remote_file, fail_message, login='root'):
+  scp = SCP(login, args.key, ip, local_file,
+            remote_file, fail_message)
+  scp_result = scp.run()
+  status_code = scp_result["exitstatus"]
+  if status_code != 0:
+    raise Exception(scp_result["errormsg"])
+
+  return scp_result["log"][0]
+
+# method to parse "gce fqdn {cluster-name}" command output and get hosts and ips pairs for every host in cluster
+def get_vms_list(args):
+  gce_fqdb_cmd = '/usr/sbin/gce fqdn {0}-{1}'.format(
+    args.cluster_prefix, cluster_name)
+  out = execute_command(args, args.controller, gce_fqdb_cmd, "Failed to get VMs list!", "-tt")
+  lines = out.split('\n')
+  #print "LINES=" + str(lines)
+  if lines[0].startswith("Using profile") and not lines[1].strip():
+    result = {}
+    for s in lines[2:]:  # Ignore non-meaningful lines
+      if not s:
+        continue
+      match = re.match(r'^([\d\.]*)\s+([\w\.-]*)\s+([\w\.-]*)\s$', s, re.M)
+      if match:
+        result[match.group(2)] = match.group(1)
+      else:
+        raise Exception('Can not parse "{0}"'.format(s))
+    return result
+  else:
+    raise Exception('Can not parse "{0}"'.format(lines))
+
+
+def pretty_print_vms(vms):
+  print "----------------------------"
+  print "server ip: {0}".format(sorted(vms.items())[0][1])
+  print "Hostnames of nodes in cluster:"
+  for (hostname, ip) in sorted(vms.items()):
+    print hostname
+  print "----------------------------"
+
+
+if __name__ == "__main__":
+  main()
+
+