deploy-gce-perf-cluster.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. import argparse
  18. import os
  19. import subprocess
  20. import sys
  21. import pprint
  22. import time
  23. import traceback
  24. import re
  25. import socket
  26. cluster_name="perf-cluster"
  27. ambari_repo_file_url="http://s3.amazonaws.com/dev.hortonworks.com/ambari/centos6/2.x/latest/trunk/ambaribn.repo"
  28. public_hostname_script="foo"
  29. hostname_script="foo"
  30. start_number=1
  31. number_of_agents_on_host=50
  32. class SSH:
  33. """ Ssh implementation of this """
  34. def __init__(self, user, sshkey_file, host, command, custom_option='', errorMessage = None):
  35. self.user = user
  36. self.sshkey_file = sshkey_file
  37. self.host = host
  38. self.command = command
  39. self.errorMessage = errorMessage
  40. self.custom_option = custom_option
  41. pass
  42. def run(self):
  43. sshcommand = ["ssh",
  44. "-o", "ConnectTimeOut=180",
  45. "-o", "StrictHostKeyChecking=no",
  46. "-o", "BatchMode=yes",
  47. self.custom_option,
  48. "-i", self.sshkey_file,
  49. self.user + "@" + self.host, self.command]
  50. if not self.custom_option:
  51. del sshcommand[7]
  52. i = 1
  53. while True:
  54. try:
  55. sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE,
  56. stderr=subprocess.PIPE)
  57. log = sshstat.communicate()
  58. if sshstat.returncode != 0:
  59. print "Executing SSH command on {0} failed: {1}".format(self.host, log)
  60. print "\nRetrying SSH command one more time!"
  61. if i >= 3:
  62. break
  63. i += 1
  64. time.sleep(10)
  65. continue
  66. break
  67. except:
  68. print "Could not SSH to {0}, waiting for it to start".format(self.host)
  69. i += 1
  70. time.sleep(10)
  71. if i >= 3:
  72. print "Could not execute remote ssh command: " + ' '.join(sshcommand)
  73. raise Exception("Could not connect to {0}. Giving up with erros: {1}".format(self.host, log))
  74. errorMsg = log[1]
  75. if self.errorMessage and sshstat.returncode != 0:
  76. errorMsg = self.errorMessage + "\n" + errorMsg
  77. print "SSH command execution finished"
  78. return {"exitstatus": sshstat.returncode, "log": log, "errormsg": errorMsg}
  79. class SCP:
  80. """ SCP implementation that is thread based. The status can be returned using
  81. status val """
  82. def __init__(self, user, sshkey_file, host, inputFile, remote, errorMessage = None):
  83. self.user = user
  84. self.sshkey_file = sshkey_file
  85. self.host = host
  86. self.inputFile = inputFile
  87. self.remote = remote
  88. self.errorMessage = errorMessage
  89. pass
  90. def run(self):
  91. scpcommand = ["scp",
  92. "-r",
  93. "-o", "ConnectTimeout=60",
  94. "-o", "BatchMode=yes",
  95. "-o", "StrictHostKeyChecking=no",
  96. "-i", self.sshkey_file, self.inputFile, self.user + "@" +
  97. self.host + ":" + self.remote]
  98. i = 1
  99. while True:
  100. try:
  101. scpstat = subprocess.Popen(scpcommand, stdout=subprocess.PIPE,
  102. stderr=subprocess.PIPE)
  103. log = scpstat.communicate()
  104. if scpstat.returncode != 0:
  105. print "Executing SCP command on {0} failed: {1}".format(self.host, log)
  106. print "\nRetrying SCP command one more time!"
  107. if i >= 3:
  108. break
  109. i += 1
  110. time.sleep(10)
  111. continue
  112. break
  113. except:
  114. print "Could not SCP to {0}, waiting for it to start".format(self.host)
  115. i += 1
  116. time.sleep(10)
  117. if i >= 3:
  118. print "Could not execute remote scp command: " + ' '.join(scpcommand)
  119. raise Exception("Could not connect to {0}. Giving up with erros: {1}".format(self.host, log))
  120. errorMsg = log[1]
  121. if self.errorMessage and scpstat.returncode != 0:
  122. errorMsg = self.errorMessage + "\n" + errorMsg
  123. print "SCP command execution finished"
  124. return {"exitstatus": scpstat.returncode, "log": log, "errormsg": errorMsg}
  125. # main method to parse arguments from user and start work
  126. def main():
  127. parser = argparse.ArgumentParser(
  128. description='This script brings up a cluster with ambari installed, configured and started',
  129. epilog='Only GCE is supported as of now!'
  130. )
  131. # options
  132. parser.add_argument('--controller', type=str,
  133. action='store', help='GCE controller ip address.')
  134. parser.add_argument('--key', type=str,
  135. action='store', help='Path to GCE ssh key.')
  136. parser.add_argument('--cluster-prefix', type=str,
  137. action='store', help='Cluster name prefix.')
  138. parser.add_argument('--agent-prefix', type=str,
  139. action='store', help='Agent name prefix.')
  140. parser.add_argument('--agents-count', type=int,
  141. action='store', help='Agents count for whole cluster(multiples of 50).')
  142. if len(sys.argv) <= 1:
  143. parser.print_help()
  144. else:
  145. args = parser.parse_args()
  146. do_work(args)
  147. # base method which process cluster deployment
  148. def deploy_cluster(args):
  149. number_of_nodes=args.agents_count/number_of_agents_on_host
  150. # trying to create cluster with needed params
  151. print "Creating cluster {0}-{1} with {2} large nodes on centos6...".format(args.cluster_prefix, cluster_name, str(number_of_nodes))
  152. execute_command(args, args.controller, "/usr/sbin/gce up {0}-{1} {2} --centos6 --large".format(args.cluster_prefix, cluster_name, str(number_of_nodes)),
  153. "Failed to create cluster, probably not enough resources!", "-tt")
  154. # VMs are not accessible immediately
  155. time.sleep(10)
  156. # getting list of vms information like hostname and ip address
  157. print "Getting list of virtual machines from cluster..."
  158. vms = get_vms_list(args)
  159. # check number of nodes in cluster to be the same as user asked
  160. print "Checking count of created nodes in cluster..."
  161. if not vms or len(vms) < number_of_nodes:
  162. raise Exception("Can not bring up enough nodes. Requested {0}, but got: {1}. Probably not enough resources!".format(number_of_nodes, len(vms)))
  163. print "GCE cluster was successfully created!"
  164. pretty_print_vms(vms)
  165. # installing/starting ambari-server and ambari-agents on each host
  166. server_host_name = sorted(vms.items())[0][0]
  167. server_installed=False
  168. print "Creating server.sh script (which will be executed on server to install/configure/start ambari-server and ambari-agent)..."
  169. create_server_script(args, server_host_name)
  170. print "Creating agent.sh script (which will be executed on agent hosts to install/configure/start ambari-agent..."
  171. create_agent_script(args, server_host_name)
  172. time.sleep(10)
  173. for (hostname, ip) in sorted(vms.items()):
  174. print "=========================="
  175. print "Working on {0}".format(hostname)
  176. if not server_installed:
  177. remote_path = "/server.sh"
  178. local_path = "server.sh"
  179. print "Copying server.sh to {0}...".format(hostname)
  180. put_file(args, ip, local_path, remote_path, "Failed to copy file!")
  181. print "Executing remote ssh command (set correct permissions and start executing server.sh in separate process) on {0}...".format(hostname)
  182. execute_command(args, ip, "cd /; chmod 777 server.sh; nohup ./server.sh >/server.log 2>&1 &",
  183. "Install/configure/start server script failed!")
  184. server_installed = True
  185. else:
  186. remote_path = "/agent.sh"
  187. local_path = "agent.sh"
  188. print "Copying agent.sh to {0}...".format(hostname)
  189. put_file(args, ip, local_path, remote_path, "Failed to copy file!")
  190. print "Executing remote ssh command (set correct permissions and start executing agent.sh in separate process) on {0}...".format(hostname)
  191. execute_command(args, ip, "cd /; chmod 777 agent.sh; nohup ./agent.sh >/agent.log 2>&1 &",
  192. "Install/configure start agent script failed!")
  193. print "All scripts where successfully copied and started on all hosts. " \
  194. "\nPay attention that server.sh script need 5 minutes to finish and agent.sh need 3 minutes!"
  195. # check if all required params were passed by user
  196. # if all needed params available then start cluster deploy
  197. def do_work(args):
  198. if not args.controller:
  199. raise Failure("GCE controller ip address not defined!")
  200. if not args.key:
  201. raise Failure("Path to gce ssh key not defined!")
  202. if not args.cluster_prefix:
  203. raise Failure("Cluster name prefix not defined!")
  204. if not args.agent_prefix:
  205. raise Failure("Agent name prefix not defined!")
  206. if not args.agents_count:
  207. raise Failure("Agents count for whole cluster(multiples of 50) not defined!")
  208. deploy_cluster(args)
  209. # creating server.sh script in the same dir where current script is located
  210. # server.sh script will install, configure and start ambari-server and ambari-agent on host
  211. def create_server_script(args, server_host_name):
  212. file = open("server.sh", "w")
  213. file.write("#!/bin/bash\n")
  214. file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url))
  215. file.write("yum clean all; yum install git ambari-server ambari-agent -y\n")
  216. file.write("cd /home; git clone https://github.com/apache/ambari.git\n")
  217. file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-server/resources/stacks/PERF\n")
  218. file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n")
  219. file.write("ambari-server setup -s\n")
  220. file.write("sed -i -e 's/false/true/g' /var/lib/ambari-server/resources/stacks/PERF/1.0/metainfo.xml\n")
  221. file.write("ambari-server start --skip-database-check\n")
  222. file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name))
  223. file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script))
  224. file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix))
  225. file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n")
  226. file.write("exit 0")
  227. file.close()
  228. # creating agent.sh script in the same dir where current script is located
  229. # agent.sh script will install, configure and start ambari-agent on host
  230. def create_agent_script(args, server_host_name):
  231. file = open("agent.sh", "w")
  232. file.write("#!/bin/bash\n")
  233. file.write("wget -O /etc/yum.repos.d/ambari.repo {0}\n".format(ambari_repo_file_url))
  234. file.write("yum clean all; yum install git ambari-agent -y\n")
  235. file.write("cd /home; git clone https://github.com/apache/ambari.git\n")
  236. file.write("cp -r /home/ambari/ambari-server/src/main/resources/stacks/PERF /var/lib/ambari-agent/cache/stacks/PERF\n")
  237. file.write("sed -i -e 's/hostname=localhost/hostname={0}/g' /etc/ambari-agent/conf/ambari-agent.ini\n".format(server_host_name))
  238. file.write("sed -i -e 's/agent]/agent]\\nhostname_script={0}\\npublic_hostname_script={1}\\n/1' /etc/ambari-agent/conf/ambari-agent.ini\n".format(hostname_script, public_hostname_script))
  239. file.write("printf \"start={0}\\nnum={1}\\nprefix={2}\" > /etc/ambari-agent/conf/agent-multiplier.conf\n".format(start_number, number_of_agents_on_host, args.agent_prefix))
  240. file.write("python /home/ambari/ambari-agent/conf/unix/agent-multiplier.py start\n")
  241. file.write("exit 0")
  242. file.close()
  243. # method to execute ssh commands via SSH class
  244. def execute_command(args, ip, cmd, fail_message, custom_option='', login='root'):
  245. ssh = SSH(login, args.key, ip, cmd, custom_option, fail_message)
  246. ssh_result = ssh.run()
  247. status_code = ssh_result["exitstatus"]
  248. if status_code != 0:
  249. raise Exception(ssh_result["errormsg"])
  250. return ssh_result["log"][0]
  251. # method to copy file from local to remote host via SCP class
  252. def put_file(args, ip, local_file, remote_file, fail_message, login='root'):
  253. scp = SCP(login, args.key, ip, local_file,
  254. remote_file, fail_message)
  255. scp_result = scp.run()
  256. status_code = scp_result["exitstatus"]
  257. if status_code != 0:
  258. raise Exception(scp_result["errormsg"])
  259. return scp_result["log"][0]
  260. # method to parse "gce fqdn {cluster-name}" command output and get hosts and ips pairs for every host in cluster
  261. def get_vms_list(args):
  262. gce_fqdb_cmd = '/usr/sbin/gce fqdn {0}-{1}'.format(
  263. args.cluster_prefix, cluster_name)
  264. out = execute_command(args, args.controller, gce_fqdb_cmd, "Failed to get VMs list!", "-tt")
  265. lines = out.split('\n')
  266. #print "LINES=" + str(lines)
  267. if lines[0].startswith("Using profile") and not lines[1].strip():
  268. result = {}
  269. for s in lines[2:]: # Ignore non-meaningful lines
  270. if not s:
  271. continue
  272. match = re.match(r'^([\d\.]*)\s+([\w\.-]*)\s+([\w\.-]*)\s$', s, re.M)
  273. if match:
  274. result[match.group(2)] = match.group(1)
  275. else:
  276. raise Exception('Can not parse "{0}"'.format(s))
  277. return result
  278. else:
  279. raise Exception('Can not parse "{0}"'.format(lines))
  280. def pretty_print_vms(vms):
  281. print "----------------------------"
  282. print "server ip: {0}".format(sorted(vms.items())[0][1])
  283. print "Hostnames of nodes in cluster:"
  284. for (hostname, ip) in sorted(vms.items()):
  285. print hostname
  286. print "----------------------------"
  287. if __name__ == "__main__":
  288. main()