agent-multiplier.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import sys
  16. import os
  17. import re
  18. import subprocess
  19. import shutil
  20. from optparse import OptionParser
  21. class Host:
  22. """
  23. Abstraction of the elements unique to each Ambari Agent running on this VM.
  24. """
  25. def __init__(self, host_name, ping_port, home_dir):
  26. self.host_name = host_name
  27. self.ping_port = ping_port
  28. self.home_dir = home_dir
  29. class Multiplier:
  30. """
  31. In order to perform scale testing, this class bootstraps multiple Ambari Agents to run on the same host.
  32. Each Ambari Agent has its own home directory with subdirectories for configs, logs, etc.
  33. Further, each agent is given a unique port number.
  34. Usage: python agent-multiplier.py [command]
  35. [command] = start | stop | restart | status
  36. Optional flags:
  37. -v --verbose : Increase logging
  38. """
  39. CONFIG_FILE = "/etc/ambari-agent/conf/agent-multiplier.conf"
  40. def __init__(self, args):
  41. parser = OptionParser()
  42. parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
  43. help="Verbose logging")
  44. (options, args) = parser.parse_args(args)
  45. self.verbose = options.verbose
  46. self.home_dir = "/home/"
  47. # Subdirectories to create inside the home dir for the given agent.
  48. self.log_dir = "/var/log/ambari-agent"
  49. self.config_dir = "/etc/ambari-agent/conf"
  50. self.pid_file = "/var/run/ambari-agent/ambari-agent.pid"
  51. self.prefix_dir = "/var/lib/ambari-agent/data"
  52. # Ambari Agent config file to use as a template
  53. # Will change hostname and port after copying
  54. self.source_config_file = "/etc/ambari-agent/conf/ambari-agent.ini"
  55. self.source_version_file = "/var/lib/ambari-agent/data/version"
  56. self.base_ping_port = 5000
  57. self.start = 0
  58. self.num = 0
  59. self.prefix = None
  60. # Parse above params from config file, which must exist
  61. self.parse_configs()
  62. if len(args) != 2:
  63. print "Sample Usage: python agent_multiplier.py [action]\n" \
  64. "actions: start|stop|restart|status"
  65. self.command = args[1]
  66. # Validate configs
  67. self.validate()
  68. print "*** Params ***"
  69. print "Start: %d" % self.start
  70. print "Num: %d" % self.num
  71. print "Prefix: %s" % self.prefix
  72. print "Command: %s" % self.command
  73. # All hostnames that will be managed by Ambari Agents on this host
  74. self.hosts = []
  75. for i in range(self.start, self.start + self.num):
  76. host_name = "%s-%04d" % (self.prefix, i)
  77. host_home_dir = os.path.join(self.home_dir, host_name)
  78. host = Host(host_name, self.base_ping_port + i, host_home_dir)
  79. self.hosts.append(host)
  80. self.bootstrap()
  81. def parse_configs(self):
  82. """
  83. Parse the configuration file to set the config params.
  84. """
  85. if not os.path.exists(self.CONFIG_FILE):
  86. print "Did not find Agent Multiplier config file: %s" % str(self.CONFIG_FILE)
  87. sys.exit(-1)
  88. params = {}
  89. with open(self.CONFIG_FILE, "r") as f:
  90. for line in f.readlines():
  91. index = line.index("=") if "=" in line else None
  92. if index is not None:
  93. config = line[0:index].strip()
  94. value = line[index+1:].strip()
  95. params[config] = value
  96. # Convert some configs to ints
  97. if "start" in params:
  98. self.start = int(params["start"])
  99. if "num" in params:
  100. self.num = int(params["num"])
  101. if "prefix" in params:
  102. self.prefix = params["prefix"].strip().lower()
  103. def validate(self):
  104. """
  105. Validate the configs are non-empty and contain correct ranges.
  106. On error, will exit with code -1
  107. """
  108. errors = []
  109. if self.start <= 0:
  110. errors.append("Start must be a positive integer")
  111. if self.num <= 0:
  112. errors.append("Number of agents on this host must be a positive integer")
  113. if self.prefix is None or self.prefix.strip() == "":
  114. errors.append("Prefix is a required field")
  115. if not os.path.isfile(self.source_config_file):
  116. errors.append("Ambari Agent config file does not exist at %s" % self.source_config_file)
  117. valid_commands = set(["start", "stop", "restart", "status"])
  118. if self.command is None or self.command not in valid_commands:
  119. errors.append("Command must be one of %s" % ", ".join(valid_commands))
  120. if len(errors) > 0:
  121. print "Error:"
  122. print "\n".join(errors)
  123. sys.exit(-1)
  124. def bootstrap(self):
  125. """
  126. Bootstrap each Ambari Agent that will run on this host with the directories it needs and configuration file.
  127. """
  128. for host in self.hosts:
  129. host_name = host.host_name
  130. host_home_dir = host.home_dir
  131. host_log_dir = host_home_dir + self.log_dir
  132. host_config_dir = host_home_dir + self.config_dir
  133. host_pid_file = host_home_dir + self.pid_file
  134. host_pid_dir = os.path.dirname(host_pid_file)
  135. host_prefix = host_home_dir + self.prefix_dir
  136. if self.verbose:
  137. print "Analyzing host %s with port %d" % (host_name, host.ping_port)
  138. for dir in [host_home_dir, host_log_dir, host_config_dir, host_pid_dir, host_prefix]:
  139. if not os.path.isdir(dir):
  140. print "Creating dir %s" % (dir)
  141. os.makedirs(dir)
  142. # Copy config file
  143. host_config_file = os.path.join(host_config_dir, "ambari-agent.ini")
  144. if not os.path.isfile(host_config_file):
  145. print "Copying config file %s" % str(host_config_file)
  146. shutil.copyfile(self.source_config_file, host_config_file)
  147. # Copy version file
  148. version_file = os.path.join(host_prefix, "version")
  149. if not os.path.isfile(version_file):
  150. print "Copying version file %s" % str(version_file)
  151. shutil.copyfile(self.source_version_file, version_file)
  152. # Create hostname.sh script to use custom FQDN for each agent.
  153. host_name_script = os.path.join(host_config_dir, "hostname.sh")
  154. self.create_host_name_script(host_name, host_name_script)
  155. # Overwrite the port and hostname
  156. config_dict = {"ping_port": host.ping_port,
  157. "hostname_script": host_name_script,
  158. "public_hostname_script": host_name_script,
  159. "logdir": host_log_dir,
  160. "piddir": host_pid_dir,
  161. "prefix": host_prefix}
  162. self.change_config(host_config_file, config_dict)
  163. # Change /etc/hosts file by appending each hostname.
  164. self.modify_etc_hosts_file()
  165. def create_host_name_script(self, host_name, host_name_script):
  166. """
  167. Creates a shell script that will echo the given hostname.
  168. :param host_name: Host name to echo
  169. :param host_name_script: Location to save the scrip to
  170. """
  171. template = "#!/bin/sh\n" \
  172. "echo HOSTNAME"
  173. with open(str(host_name_script), "w+") as f:
  174. f.writelines(template.replace("HOSTNAME", host_name))
  175. subprocess.call("chmod +x %s" % host_name_script, shell=True)
  176. def change_config(self, config_file, config_dict):
  177. """
  178. Change existing configs. Will not append new configs.
  179. :param config_file: Config file to modify
  180. :param config_dict: Dictionary of config,value pairs to change.
  181. """
  182. # TODO, allow appending configs to [AGENT] section.
  183. if not os.path.exists(config_file):
  184. print "ERROR. Did not file config file: %s" % config_file
  185. return
  186. lines = []
  187. with open(config_file, "r") as f:
  188. lines = f.readlines()
  189. new_lines = []
  190. configs_found = set()
  191. configs_changed = set()
  192. for line in lines:
  193. for config, value in config_dict.iteritems():
  194. p = re.compile(config + "\s?=")
  195. if p.match(line):
  196. configs_found.add(config)
  197. new_value = config + "=" + str(value) + "\n"
  198. if line != new_value:
  199. line = new_value
  200. configs_changed.add(config)
  201. continue
  202. # Config didn't change value
  203. new_lines.append(line)
  204. # TODO, if can append configs, then this is not needed.
  205. if len(configs_found) < len(config_dict.keys()):
  206. missing_configs = set(config_dict.keys()) - configs_found
  207. print "ERROR: Did not find all required configs. Missing: %s" % ", ".join(missing_configs)
  208. sys.exit(-1)
  209. if len(configs_changed) > 0:
  210. print "Making changes to file %s" % config_file
  211. with open(config_file, "w") as f:
  212. f.writelines(new_lines)
  213. def modify_etc_hosts_file(self):
  214. """
  215. Modify this host's /etc/hosts file by changing the line for localhost with synonyms for all of the other
  216. fake host names that will be generated for the Ambari Agents.
  217. """
  218. etc_hosts = "/etc/hosts"
  219. if not os.path.isfile(etc_hosts):
  220. print "ERROR. Did not find file %s" % etc_hosts
  221. return
  222. lines = []
  223. with open(etc_hosts, "r") as f:
  224. lines = f.readlines()
  225. # Value to search for when using Vagrant VMs
  226. localhost_line_start = "127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 "
  227. new_lines = []
  228. line_changed = False
  229. for line in lines:
  230. if line.startswith("127.0.0.1"):
  231. new_change = localhost_line_start + " ".join([x.host_name for x in self.hosts]) + "\n"
  232. if line != new_change:
  233. line = new_change
  234. line_changed = True
  235. new_lines.append(line)
  236. if line_changed:
  237. print "Making changes to %s" % etc_hosts
  238. with open(etc_hosts, "w") as f:
  239. f.writelines(new_lines)
  240. def run(self):
  241. """
  242. Run one of the supported commands: start, stop, restart, and status
  243. """
  244. if self.command == "start":
  245. self.cmd_start()
  246. elif self.command == "stop":
  247. self.cmd_stop()
  248. elif self.command == "restart":
  249. self.cmd_restart()
  250. elif self.command == "status":
  251. self.cmd_status()
  252. def cmd_start(self):
  253. print "Starting %d host(s)" % len(self.hosts)
  254. for host in self.hosts:
  255. cmd = "ambari-agent start --home %s" % (host.home_dir)
  256. subprocess.call(cmd, shell=True)
  257. def cmd_stop(self):
  258. print "Stopping %d host(s)" % len(self.hosts)
  259. for host in self.hosts:
  260. cmd = "ambari-agent stop --home %s" % (host.home_dir)
  261. subprocess.call(cmd, shell=True)
  262. def cmd_restart(self):
  263. print "Restarting %d host(s)" % len(self.hosts)
  264. for host in self.hosts:
  265. cmd = "ambari-agent restart --home %s" % (host.home_dir)
  266. subprocess.call(cmd, shell=True)
  267. def cmd_status(self):
  268. print "Summary of Agent Status:"
  269. print "Total agents: %d\n" % len(self.hosts)
  270. (running_hosts, unknown_hosts, stopped_hosts) = self.aggregate_status()
  271. print "Running agents: %d" % len(running_hosts)
  272. if self.verbose and len(running_hosts):
  273. print "(%s)\n" % (", ".join(running_hosts))
  274. print "Unknown agents: %d" % len(unknown_hosts)
  275. if self.verbose and len(unknown_hosts):
  276. print "(%s)\n" % (", ".join(unknown_hosts))
  277. print "Stopped agents: %d" % len(stopped_hosts)
  278. if self.verbose and len(stopped_hosts):
  279. print "(%s)\n" % (", ".join(stopped_hosts))
  280. def aggregate_status(self):
  281. """
  282. Aggregate the status of all of the hosts.
  283. :return: Return a 3-tuple of (list of x, list of y, list of z)
  284. x = hosts running with a valid pid
  285. y = hosts with a pid file but process is not running
  286. z = hosts without a pid file
  287. """
  288. running_hosts = []
  289. unknown_hosts = []
  290. stopped_hosts = []
  291. for host in self.hosts:
  292. pid_file = os.path.join(self.home_dir, host.host_name, self.pid_file.lstrip("/"))
  293. if os.path.isfile(pid_file):
  294. pid = None
  295. with open(pid_file, "r") as f:
  296. try:
  297. line = f.readline()
  298. pid = int(line.strip())
  299. except:
  300. pass
  301. is_running = Multiplier.check_pid(pid)
  302. if is_running:
  303. running_hosts.append(host.host_name)
  304. else:
  305. unknown_hosts.append(host.host_name)
  306. else:
  307. stopped_hosts.append(host.host_name)
  308. return (running_hosts, unknown_hosts, stopped_hosts)
  309. @classmethod
  310. def check_pid(cls, pid):
  311. """ Check For the existence of a unix pid. """
  312. try:
  313. os.kill(pid, 0)
  314. except OSError:
  315. return False
  316. else:
  317. return True
  318. if __name__ == "__main__":
  319. m = Multiplier(sys.argv)
  320. m.run()