agent-multiplier.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import sys
  16. import os
  17. import re
  18. import subprocess
  19. import shutil
  20. from optparse import OptionParser
  21. class Host:
  22. """
  23. Abstraction of the elements unique to each Ambari Agent running on this VM.
  24. """
  25. def __init__(self, host_name, ping_port, home_dir):
  26. self.host_name = host_name
  27. self.ping_port = ping_port
  28. self.home_dir = home_dir
  29. class Multiplier:
  30. """
  31. In order to perform scale testing, this class bootstraps multiple Ambari Agents to run on the same host.
  32. Each Ambari Agent has its own home directory with subdirectories for configs, logs, etc.
  33. Further, each agent is given a unique port number.
  34. Usage: python agent-multiplier.py [command]
  35. [command] = start | stop | restart | status
  36. Optional flags:
  37. -v --verbose : Increase logging
  38. """
  39. CONFIG_FILE = "/etc/ambari-agent/conf/agent-multiplier.conf"
  40. def __init__(self, args):
  41. parser = OptionParser()
  42. parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
  43. help="Verbose logging")
  44. (options, args) = parser.parse_args(args)
  45. self.verbose = options.verbose
  46. self.home_dir = "/home/"
  47. # Subdirectories to create inside the home dir for the given agent.
  48. self.log_dir = "/var/log/ambari-agent"
  49. self.config_dir = "/etc/ambari-agent/conf"
  50. self.pid_file = "/var/run/ambari-agent/ambari-agent.pid"
  51. # Ambari Agent config file to use as a template
  52. # Will change hostname and port after copying
  53. self.source_config_file = "/etc/ambari-agent/conf/ambari-agent.ini"
  54. self.base_ping_port = 5000
  55. self.start = 0
  56. self.num = 0
  57. self.prefix = None
  58. # Parse above params from config file, which must exist
  59. self.parse_configs()
  60. if len(args) != 2:
  61. print "Sample Usage: python agent_multiplier.py [action]\n" \
  62. "actions: start|stop|restart|status"
  63. self.command = args[1]
  64. # Validate configs
  65. self.validate()
  66. print "*** Params ***"
  67. print "Start: %d" % self.start
  68. print "Num: %d" % self.num
  69. print "Prefix: %s" % self.prefix
  70. print "Command: %s" % self.command
  71. # All hostnames that will be managed by Ambari Agents on this host
  72. self.hosts = []
  73. for i in range(self.start, self.start + self.num):
  74. host_name = "%s-%04d" % (self.prefix, i)
  75. host_home_dir = os.path.join(self.home_dir, host_name)
  76. host = Host(host_name, self.base_ping_port + i, host_home_dir)
  77. self.hosts.append(host)
  78. self.bootstrap()
  79. def parse_configs(self):
  80. """
  81. Parse the configuration file to set the config params.
  82. """
  83. if not os.path.exists(self.CONFIG_FILE):
  84. print "Did not find Agent Multiplier config file: %s" % str(self.CONFIG_FILE)
  85. sys.exit(-1)
  86. params = {}
  87. with open(self.CONFIG_FILE, "r") as f:
  88. for line in f.readlines():
  89. index = line.index("=") if "=" in line else None
  90. if index is not None:
  91. config = line[0:index].strip()
  92. value = line[index+1:].strip()
  93. params[config] = value
  94. # Convert some configs to ints
  95. if "start" in params:
  96. self.start = int(params["start"])
  97. if "num" in params:
  98. self.num = int(params["num"])
  99. if "prefix" in params:
  100. self.prefix = params["prefix"].strip().lower()
  101. def validate(self):
  102. """
  103. Validate the configs are non-empty and contain correct ranges.
  104. On error, will exit with code -1
  105. """
  106. errors = []
  107. if self.start <= 0:
  108. errors.append("Start must be a positive integer")
  109. if self.num <= 0:
  110. errors.append("Number of agents on this host must be a positive integer")
  111. if self.prefix is None or self.prefix.strip() == "":
  112. errors.append("Prefix is a required field")
  113. if not os.path.isfile(self.source_config_file):
  114. errors.append("Ambari Agent config file does not exist at %s" % self.source_config_file)
  115. valid_commands = set(["start", "stop", "restart", "status"])
  116. if self.command is None or self.command not in valid_commands:
  117. errors.append("Command must be one of %s" % ", ".join(valid_commands))
  118. if len(errors) > 0:
  119. print "Error:"
  120. print "\n".join(errors)
  121. sys.exit(-1)
  122. def bootstrap(self):
  123. """
  124. Bootstrap each Ambari Agent that will run on this host with the directories it needs and configuration file.
  125. """
  126. for host in self.hosts:
  127. host_name = host.host_name
  128. host_home_dir = host.home_dir
  129. host_log_dir = host_home_dir + self.log_dir
  130. host_config_dir = host_home_dir + self.config_dir
  131. host_pid_file = host_home_dir + self.pid_file
  132. host_pid_dir = os.path.dirname(host_pid_file)
  133. if self.verbose:
  134. print "Analyzing host %s with port %d" % (host_name, host.ping_port)
  135. for dir in [host_home_dir, host_log_dir, host_config_dir, host_pid_dir]:
  136. if not os.path.isdir(dir):
  137. print "Creating dir %s" % (dir)
  138. os.makedirs(dir)
  139. # Copy config file
  140. host_config_file = os.path.join(host_config_dir, "ambari-agent.ini")
  141. if not os.path.isfile(host_config_file):
  142. print "Copying config file %s" % str(host_config_file)
  143. shutil.copyfile(self.source_config_file, host_config_file)
  144. # Create hostname.sh script to use custom FQDN for each agent.
  145. host_name_script = os.path.join(host_config_dir, "hostname.sh")
  146. self.create_host_name_script(host_name, host_name_script)
  147. # Overwrite the port and hostname
  148. config_dict = {"ping_port": host.ping_port,
  149. "hostname_script": host_name_script,
  150. "public_hostname_script": host_name_script,
  151. "logdir": host_log_dir,
  152. "piddir": host_pid_dir}
  153. self.change_config(host_config_file, config_dict)
  154. # Change /etc/hosts file by appending each hostname.
  155. self.modify_etc_hosts_file()
  156. def create_host_name_script(self, host_name, host_name_script):
  157. """
  158. Creates a shell script that will echo the given hostname.
  159. :param host_name: Host name to echo
  160. :param host_name_script: Location to save the scrip to
  161. """
  162. template = "#!/bin/sh\n" \
  163. "echo HOSTNAME"
  164. with open(str(host_name_script), "w+") as f:
  165. f.writelines(template.replace("HOSTNAME", host_name))
  166. subprocess.call("chmod +x %s" % host_name_script, shell=True)
  167. def change_config(self, config_file, config_dict):
  168. """
  169. Change existing configs. Will not append new configs.
  170. :param config_file: Config file to modify
  171. :param config_dict: Dictionary of config,value pairs to change.
  172. """
  173. # TODO, allow appending configs to [AGENT] section.
  174. if not os.path.exists(config_file):
  175. print "ERROR. Did not file config file: %s" % config_file
  176. return
  177. lines = []
  178. with open(config_file, "r") as f:
  179. lines = f.readlines()
  180. new_lines = []
  181. configs_found = set()
  182. configs_changed = set()
  183. for line in lines:
  184. for config, value in config_dict.iteritems():
  185. p = re.compile(config + "\s?=")
  186. if p.match(line):
  187. configs_found.add(config)
  188. new_value = config + "=" + str(value) + "\n"
  189. if line != new_value:
  190. line = new_value
  191. configs_changed.add(config)
  192. continue
  193. # Config didn't change value
  194. new_lines.append(line)
  195. # TODO, if can append configs, then this is not needed.
  196. if len(configs_found) < len(config_dict.keys()):
  197. missing_configs = set(config_dict.keys()) - configs_found
  198. print "ERROR: Did not find all required configs. Missing: %s" % ", ".join(missing_configs)
  199. sys.exit(-1)
  200. if len(configs_changed) > 0:
  201. print "Making changes to file %s" % config_file
  202. with open(config_file, "w") as f:
  203. f.writelines(new_lines)
  204. def modify_etc_hosts_file(self):
  205. """
  206. Modify this host's /etc/hosts file by changing the line for localhost with synonyms for all of the other
  207. fake host names that will be generated for the Ambari Agents.
  208. """
  209. etc_hosts = "/etc/hosts"
  210. if not os.path.isfile(etc_hosts):
  211. print "ERROR. Did not find file %s" % etc_hosts
  212. return
  213. lines = []
  214. with open(etc_hosts, "r") as f:
  215. lines = f.readlines()
  216. # Value to search for when using Vagrant VMs
  217. localhost_line_start = "127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 "
  218. new_lines = []
  219. line_changed = False
  220. for line in lines:
  221. if line.startswith("127.0.0.1"):
  222. new_change = localhost_line_start + " ".join([x.host_name for x in self.hosts]) + "\n"
  223. if line != new_change:
  224. line = new_change
  225. line_changed = True
  226. new_lines.append(line)
  227. if line_changed:
  228. print "Making changes to %s" % etc_hosts
  229. with open(etc_hosts, "w") as f:
  230. f.writelines(new_lines)
  231. def run(self):
  232. """
  233. Run one of the supported commands: start, stop, restart, and status
  234. """
  235. if self.command == "start":
  236. self.cmd_start()
  237. elif self.command == "stop":
  238. self.cmd_stop()
  239. elif self.command == "restart":
  240. self.cmd_restart()
  241. elif self.command == "status":
  242. self.cmd_status()
  243. def cmd_start(self):
  244. print "Starting %d host(s)" % len(self.hosts)
  245. for host in self.hosts:
  246. cmd = "ambari-agent start --home %s" % (host.home_dir)
  247. subprocess.call(cmd, shell=True)
  248. def cmd_stop(self):
  249. print "Stopping %d host(s)" % len(self.hosts)
  250. for host in self.hosts:
  251. cmd = "ambari-agent stop --home %s" % (host.home_dir)
  252. subprocess.call(cmd, shell=True)
  253. def cmd_restart(self):
  254. print "Restarting %d host(s)" % len(self.hosts)
  255. for host in self.hosts:
  256. cmd = "ambari-agent restart --home %s" % (host.home_dir)
  257. subprocess.call(cmd, shell=True)
  258. def cmd_status(self):
  259. print "Summary of Agent Status:"
  260. print "Total agents: %d\n" % len(self.hosts)
  261. (running_hosts, unknown_hosts, stopped_hosts) = self.aggregate_status()
  262. print "Running agents: %d" % len(running_hosts)
  263. if self.verbose and len(running_hosts):
  264. print "(%s)\n" % (", ".join(running_hosts))
  265. print "Unknown agents: %d" % len(unknown_hosts)
  266. if self.verbose and len(unknown_hosts):
  267. print "(%s)\n" % (", ".join(unknown_hosts))
  268. print "Stopped agents: %d" % len(stopped_hosts)
  269. if self.verbose and len(stopped_hosts):
  270. print "(%s)\n" % (", ".join(stopped_hosts))
  271. def aggregate_status(self):
  272. """
  273. Aggregate the status of all of the hosts.
  274. :return: Return a 3-tuple of (list of x, list of y, list of z)
  275. x = hosts running with a valid pid
  276. y = hosts with a pid file but process is not running
  277. z = hosts without a pid file
  278. """
  279. running_hosts = []
  280. unknown_hosts = []
  281. stopped_hosts = []
  282. for host in self.hosts:
  283. pid_file = os.path.join(self.home_dir, host.host_name, self.pid_file.lstrip("/"))
  284. if os.path.isfile(pid_file):
  285. pid = None
  286. with open(pid_file, "r") as f:
  287. try:
  288. line = f.readline()
  289. pid = int(line.strip())
  290. except:
  291. pass
  292. is_running = Multiplier.check_pid(pid)
  293. if is_running:
  294. running_hosts.append(host.host_name)
  295. else:
  296. unknown_hosts.append(host.host_name)
  297. else:
  298. stopped_hosts.append(host.host_name)
  299. return (running_hosts, unknown_hosts, stopped_hosts)
  300. @classmethod
  301. def check_pid(cls, pid):
  302. """ Check For the existence of a unix pid. """
  303. try:
  304. os.kill(pid, 0)
  305. except OSError:
  306. return False
  307. else:
  308. return True
  309. if __name__ == "__main__":
  310. m = Multiplier(sys.argv)
  311. m.run()