123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import sys
- import os
- import re
- import subprocess
- import shutil
- from optparse import OptionParser
- class Host:
- """
- Abstraction of the elements unique to each Ambari Agent running on this VM.
- """
- def __init__(self, host_name, ping_port, home_dir):
- self.host_name = host_name
- self.ping_port = ping_port
- self.home_dir = home_dir
- class Multiplier:
- """
- In order to perform scale testing, this class bootstraps multiple Ambari Agents to run on the same host.
- Each Ambari Agent has its own home directory with subdirectories for configs, logs, etc.
- Further, each agent is given a unique port number.
- Usage: python agent-multiplier.py [command]
- [command] = start | stop | restart | status
- Optional flags:
- -v --verbose : Increase logging
- """
- CONFIG_FILE = "/etc/ambari-agent/conf/agent-multiplier.conf"
- def __init__(self, args):
- parser = OptionParser()
- parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
- help="Verbose logging")
- (options, args) = parser.parse_args(args)
- self.verbose = options.verbose
- self.home_dir = "/home/"
- # Subdirectories to create inside the home dir for the given agent.
- self.log_dir = "/var/log/ambari-agent"
- self.config_dir = "/etc/ambari-agent/conf"
- self.pid_file = "/var/run/ambari-agent/ambari-agent.pid"
- self.prefix_dir = "/var/lib/ambari-agent/data"
- # Ambari Agent config file to use as a template
- # Will change hostname and port after copying
- self.source_config_file = "/etc/ambari-agent/conf/ambari-agent.ini"
- self.source_version_file = "/var/lib/ambari-agent/data/version"
- self.base_ping_port = 5000
- self.start = 0
- self.num = 0
- self.prefix = None
- # Parse above params from config file, which must exist
- self.parse_configs()
- if len(args) != 2:
- print "Sample Usage: python agent_multiplier.py [action]\n" \
- "actions: start|stop|restart|status"
- self.command = args[1]
- # Validate configs
- self.validate()
- print "*** Params ***"
- print "Start: %d" % self.start
- print "Num: %d" % self.num
- print "Prefix: %s" % self.prefix
- print "Command: %s" % self.command
- # All hostnames that will be managed by Ambari Agents on this host
- self.hosts = []
- for i in range(self.start, self.start + self.num):
- host_name = "%s-%04d" % (self.prefix, i)
- host_home_dir = os.path.join(self.home_dir, host_name)
- host = Host(host_name, self.base_ping_port + i, host_home_dir)
- self.hosts.append(host)
- self.bootstrap()
- def parse_configs(self):
- """
- Parse the configuration file to set the config params.
- """
- if not os.path.exists(self.CONFIG_FILE):
- print "Did not find Agent Multiplier config file: %s" % str(self.CONFIG_FILE)
- sys.exit(-1)
- params = {}
- with open(self.CONFIG_FILE, "r") as f:
- for line in f.readlines():
- index = line.index("=") if "=" in line else None
- if index is not None:
- config = line[0:index].strip()
- value = line[index+1:].strip()
- params[config] = value
- # Convert some configs to ints
- if "start" in params:
- self.start = int(params["start"])
- if "num" in params:
- self.num = int(params["num"])
- if "prefix" in params:
- self.prefix = params["prefix"].strip().lower()
- def validate(self):
- """
- Validate the configs are non-empty and contain correct ranges.
- On error, will exit with code -1
- """
- errors = []
- if self.start <= 0:
- errors.append("Start must be a positive integer")
- if self.num <= 0:
- errors.append("Number of agents on this host must be a positive integer")
- if self.prefix is None or self.prefix.strip() == "":
- errors.append("Prefix is a required field")
-
- if not os.path.isfile(self.source_config_file):
- errors.append("Ambari Agent config file does not exist at %s" % self.source_config_file)
- valid_commands = set(["start", "stop", "restart", "status"])
- if self.command is None or self.command not in valid_commands:
- errors.append("Command must be one of %s" % ", ".join(valid_commands))
- if len(errors) > 0:
- print "Error:"
- print "\n".join(errors)
- sys.exit(-1)
- def bootstrap(self):
- """
- Bootstrap each Ambari Agent that will run on this host with the directories it needs and configuration file.
- """
- for host in self.hosts:
- host_name = host.host_name
- host_home_dir = host.home_dir
- host_log_dir = host_home_dir + self.log_dir
- host_config_dir = host_home_dir + self.config_dir
- host_pid_file = host_home_dir + self.pid_file
- host_pid_dir = os.path.dirname(host_pid_file)
- host_prefix = host_home_dir + self.prefix_dir
- if self.verbose:
- print "Analyzing host %s with port %d" % (host_name, host.ping_port)
- for dir in [host_home_dir, host_log_dir, host_config_dir, host_pid_dir, host_prefix]:
- if not os.path.isdir(dir):
- print "Creating dir %s" % (dir)
- os.makedirs(dir)
- # Copy config file
- host_config_file = os.path.join(host_config_dir, "ambari-agent.ini")
- if not os.path.isfile(host_config_file):
- print "Copying config file %s" % str(host_config_file)
- shutil.copyfile(self.source_config_file, host_config_file)
- # Copy version file
- version_file = os.path.join(host_prefix, "version")
- if not os.path.isfile(version_file):
- print "Copying version file %s" % str(version_file)
- shutil.copyfile(self.source_version_file, version_file)
- # Create hostname.sh script to use custom FQDN for each agent.
- host_name_script = os.path.join(host_config_dir, "hostname.sh")
- self.create_host_name_script(host_name, host_name_script)
- # Overwrite the port and hostname
- config_dict = {"ping_port": host.ping_port,
- "hostname_script": host_name_script,
- "public_hostname_script": host_name_script,
- "logdir": host_log_dir,
- "piddir": host_pid_dir,
- "prefix": host_prefix}
- self.change_config(host_config_file, config_dict)
- # Change /etc/hosts file by appending each hostname.
- self.modify_etc_hosts_file()
- def create_host_name_script(self, host_name, host_name_script):
- """
- Creates a shell script that will echo the given hostname.
- :param host_name: Host name to echo
- :param host_name_script: Location to save the scrip to
- """
- template = "#!/bin/sh\n" \
- "echo HOSTNAME"
- with open(str(host_name_script), "w+") as f:
- f.writelines(template.replace("HOSTNAME", host_name))
- subprocess.call("chmod +x %s" % host_name_script, shell=True)
- def change_config(self, config_file, config_dict):
- """
- Change existing configs. Will not append new configs.
- :param config_file: Config file to modify
- :param config_dict: Dictionary of config,value pairs to change.
- """
- # TODO, allow appending configs to [AGENT] section.
- if not os.path.exists(config_file):
- print "ERROR. Did not file config file: %s" % config_file
- return
- lines = []
- with open(config_file, "r") as f:
- lines = f.readlines()
- new_lines = []
- configs_found = set()
- configs_changed = set()
- for line in lines:
- for config, value in config_dict.iteritems():
- p = re.compile(config + "\s?=")
- if p.match(line):
- configs_found.add(config)
- new_value = config + "=" + str(value) + "\n"
- if line != new_value:
- line = new_value
- configs_changed.add(config)
- continue
- # Config didn't change value
- new_lines.append(line)
- # TODO, if can append configs, then this is not needed.
- if len(configs_found) < len(config_dict.keys()):
- missing_configs = set(config_dict.keys()) - configs_found
- print "ERROR: Did not find all required configs. Missing: %s" % ", ".join(missing_configs)
- sys.exit(-1)
- if len(configs_changed) > 0:
- print "Making changes to file %s" % config_file
- with open(config_file, "w") as f:
- f.writelines(new_lines)
- def modify_etc_hosts_file(self):
- """
- Modify this host's /etc/hosts file by changing the line for localhost with synonyms for all of the other
- fake host names that will be generated for the Ambari Agents.
- """
- etc_hosts = "/etc/hosts"
- if not os.path.isfile(etc_hosts):
- print "ERROR. Did not find file %s" % etc_hosts
- return
- lines = []
- with open(etc_hosts, "r") as f:
- lines = f.readlines()
- # Value to search for when using Vagrant VMs
- localhost_line_start = "127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 "
- new_lines = []
- line_changed = False
- for line in lines:
- if line.startswith("127.0.0.1"):
- new_change = localhost_line_start + " ".join([x.host_name for x in self.hosts]) + "\n"
- if line != new_change:
- line = new_change
- line_changed = True
- new_lines.append(line)
- if line_changed:
- print "Making changes to %s" % etc_hosts
- with open(etc_hosts, "w") as f:
- f.writelines(new_lines)
- def run(self):
- """
- Run one of the supported commands: start, stop, restart, and status
- """
- if self.command == "start":
- self.cmd_start()
- elif self.command == "stop":
- self.cmd_stop()
- elif self.command == "restart":
- self.cmd_restart()
- elif self.command == "status":
- self.cmd_status()
- def cmd_start(self):
- print "Starting %d host(s)" % len(self.hosts)
- for host in self.hosts:
- cmd = "ambari-agent start --home %s" % (host.home_dir)
- subprocess.call(cmd, shell=True)
- def cmd_stop(self):
- print "Stopping %d host(s)" % len(self.hosts)
- for host in self.hosts:
- cmd = "ambari-agent stop --home %s" % (host.home_dir)
- subprocess.call(cmd, shell=True)
- def cmd_restart(self):
- print "Restarting %d host(s)" % len(self.hosts)
- for host in self.hosts:
- cmd = "ambari-agent restart --home %s" % (host.home_dir)
- subprocess.call(cmd, shell=True)
- def cmd_status(self):
- print "Summary of Agent Status:"
- print "Total agents: %d\n" % len(self.hosts)
- (running_hosts, unknown_hosts, stopped_hosts) = self.aggregate_status()
- print "Running agents: %d" % len(running_hosts)
- if self.verbose and len(running_hosts):
- print "(%s)\n" % (", ".join(running_hosts))
- print "Unknown agents: %d" % len(unknown_hosts)
- if self.verbose and len(unknown_hosts):
- print "(%s)\n" % (", ".join(unknown_hosts))
- print "Stopped agents: %d" % len(stopped_hosts)
- if self.verbose and len(stopped_hosts):
- print "(%s)\n" % (", ".join(stopped_hosts))
- def aggregate_status(self):
- """
- Aggregate the status of all of the hosts.
- :return: Return a 3-tuple of (list of x, list of y, list of z)
- x = hosts running with a valid pid
- y = hosts with a pid file but process is not running
- z = hosts without a pid file
- """
- running_hosts = []
- unknown_hosts = []
- stopped_hosts = []
- for host in self.hosts:
- pid_file = os.path.join(self.home_dir, host.host_name, self.pid_file.lstrip("/"))
- if os.path.isfile(pid_file):
- pid = None
- with open(pid_file, "r") as f:
- try:
- line = f.readline()
- pid = int(line.strip())
- except:
- pass
- is_running = Multiplier.check_pid(pid)
- if is_running:
- running_hosts.append(host.host_name)
- else:
- unknown_hosts.append(host.host_name)
- else:
- stopped_hosts.append(host.host_name)
- return (running_hosts, unknown_hosts, stopped_hosts)
- @classmethod
- def check_pid(cls, pid):
- """ Check For the existence of a unix pid. """
- try:
- os.kill(pid, 0)
- except OSError:
- return False
- else:
- return True
- if __name__ == "__main__":
- m = Multiplier(sys.argv)
- m.run()
|