main.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging.handlers
  18. import signal
  19. from optparse import OptionParser
  20. import sys
  21. import traceback
  22. import getpass
  23. import os
  24. import time
  25. import platform
  26. import ConfigParser
  27. import ProcessHelper
  28. from Controller import Controller
  29. import AmbariConfig
  30. from NetUtil import NetUtil
  31. from PingPortListener import PingPortListener
  32. import hostname
  33. from DataCleaner import DataCleaner
  34. import socket
  35. from ambari_agent import shell
  36. logger = logging.getLogger()
  37. formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
  38. agentPid = os.getpid()
  39. config = AmbariConfig.AmbariConfig()
  40. configFile = config.CONFIG_FILE
  41. two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY
  42. IS_WINDOWS = platform.system() == "Windows"
  43. if IS_WINDOWS:
  44. from HeartbeatHandlers_windows import bind_signal_handlers
  45. else:
  46. from HeartbeatStopHandler_linux import bind_signal_handlers
  47. from HeartbeatStopHandler_linux import signal_handler
  48. from HeartbeatStopHandler_linux import debug
  49. def setup_logging(verbose):
  50. formatter = logging.Formatter(formatstr)
  51. rotateLog = logging.handlers.RotatingFileHandler(AmbariConfig.AmbariConfig.getLogFile(), "a", 10000000, 25)
  52. rotateLog.setFormatter(formatter)
  53. logger.addHandler(rotateLog)
  54. if verbose:
  55. logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
  56. logger.setLevel(logging.DEBUG)
  57. logger.info("loglevel=logging.DEBUG")
  58. else:
  59. logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
  60. logger.setLevel(logging.INFO)
  61. logger.info("loglevel=logging.INFO")
  62. def update_log_level(config):
  63. # Setting loglevel based on config file
  64. try:
  65. loglevel = config.get('agent', 'loglevel')
  66. if loglevel is not None:
  67. if loglevel == 'DEBUG':
  68. logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
  69. logger.setLevel(logging.DEBUG)
  70. logger.info("Newloglevel=logging.DEBUG")
  71. else:
  72. logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
  73. logger.setLevel(logging.INFO)
  74. logger.debug("Newloglevel=logging.INFO")
  75. except Exception, err:
  76. logger.info("Default loglevel=DEBUG")
  77. # ToDo: move that function inside AmbariConfig
  78. def resolve_ambari_config():
  79. global config
  80. configPath = os.path.abspath(AmbariConfig.AmbariConfig.getConfigFile())
  81. try:
  82. if os.path.exists(configPath):
  83. config.read(configPath)
  84. else:
  85. raise Exception("No config found at {0}, use default".format(configPath))
  86. except Exception, err:
  87. logger.warn(err)
  88. def perform_prestart_checks(expected_hostname):
  89. # Check if current hostname is equal to expected one (got from the server
  90. # during bootstrap.
  91. global config
  92. if expected_hostname is not None:
  93. current_hostname = hostname.hostname(config)
  94. if current_hostname != expected_hostname:
  95. print("Determined hostname does not match expected. Please check agent "
  96. "log for details")
  97. msg = "Ambari agent machine hostname ({0}) does not match expected ambari " \
  98. "server hostname ({1}). Aborting registration. Please check hostname, " \
  99. "hostname -f and /etc/hosts file to confirm your " \
  100. "hostname is setup correctly".format(current_hostname, expected_hostname)
  101. logger.error(msg)
  102. sys.exit(1)
  103. # Check if there is another instance running
  104. if os.path.isfile(ProcessHelper.pidfile) and not IS_WINDOWS:
  105. print("%s already exists, exiting" % ProcessHelper.pidfile)
  106. sys.exit(1)
  107. # check if ambari prefix exists
  108. elif config.has_option('agent', 'prefix') and not os.path.isdir(os.path.abspath(config.get('agent', 'prefix'))):
  109. msg = "Ambari prefix dir %s does not exists, can't continue" \
  110. % config.get("agent", "prefix")
  111. logger.error(msg)
  112. print(msg)
  113. sys.exit(1)
  114. elif not config.has_option('agent', 'prefix'):
  115. msg = "Ambari prefix dir %s not configured, can't continue"
  116. logger.error(msg)
  117. print(msg)
  118. sys.exit(1)
  119. def daemonize():
  120. # Daemonize current instance of Ambari Agent
  121. # Currently daemonization is done via /usr/sbin/ambari-agent script (nohup)
  122. # and agent only dumps self pid to file
  123. if not os.path.exists(ProcessHelper.piddir):
  124. os.makedirs(ProcessHelper.piddir, 0755)
  125. pid = str(os.getpid())
  126. file(ProcessHelper.pidfile, 'w').write(pid)
  127. def stop_agent():
  128. # stop existing Ambari agent
  129. pid = -1
  130. try:
  131. f = open(ProcessHelper.pidfile, 'r')
  132. pid = f.read()
  133. pid = int(pid)
  134. f.close()
  135. kill_process_group(pid, signal.SIGTERM)
  136. time.sleep(5)
  137. if os.path.exists(ProcessHelper.pidfile):
  138. raise Exception("PID file still exists.")
  139. os._exit(0)
  140. except Exception:
  141. if pid == -1:
  142. print ("Agent process is not running")
  143. else:
  144. kill_process_group(pid, signal.SIGKILL)
  145. os._exit(1)
  146. def kill_process_group(pid, sig):
  147. shell.shellRunner().run("sudo kill -{0} -$(ps -o pgid= {1} | grep -o [0-9]*)".format(sig, pid))
  148. def reset_agent(options):
  149. try:
  150. # update agent config file
  151. agent_config = ConfigParser.ConfigParser()
  152. agent_config.read(configFile)
  153. server_host = agent_config.get('server', 'hostname')
  154. new_host = options[2]
  155. if new_host is not None and server_host != new_host:
  156. print "Updating server host from " + server_host + " to " + new_host
  157. agent_config.set('server', 'hostname', new_host)
  158. with (open(configFile, "wb")) as new_agent_config:
  159. agent_config.write(new_agent_config)
  160. # clear agent certs
  161. agent_keysdir = agent_config.get('security', 'keysdir')
  162. print "Removing Agent certificates..."
  163. for root, dirs, files in os.walk(agent_keysdir, topdown=False):
  164. for name in files:
  165. os.remove(os.path.join(root, name))
  166. for name in dirs:
  167. os.rmdir(os.path.join(root, name))
  168. except Exception, err:
  169. print("A problem occurred while trying to reset the agent: " + str(err))
  170. os._exit(1)
  171. os._exit(0)
  172. # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
  173. # we need this for windows os, where no sigterm available
  174. def main(heartbeat_stop_callback=None):
  175. global config
  176. parser = OptionParser()
  177. parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose log output", default=False)
  178. parser.add_option("-e", "--expected-hostname", dest="expected_hostname", action="store",
  179. help="expected hostname of current host. If hostname differs, agent will fail", default=None)
  180. (options, args) = parser.parse_args()
  181. expected_hostname = options.expected_hostname
  182. current_user = getpass.getuser()
  183. setup_logging(options.verbose)
  184. default_cfg = {'agent': {'prefix': '/home/ambari'}}
  185. config.load(default_cfg)
  186. bind_signal_handlers(agentPid)
  187. if (len(sys.argv) > 1) and sys.argv[1] == 'stop':
  188. stop_agent()
  189. if (len(sys.argv) > 2) and sys.argv[1] == 'reset':
  190. reset_agent(sys.argv)
  191. # Check for ambari configuration file.
  192. resolve_ambari_config()
  193. # Starting data cleanup daemon
  194. data_cleaner = None
  195. if config.has_option('agent', 'data_cleanup_interval') and int(config.get('agent','data_cleanup_interval')) > 0:
  196. data_cleaner = DataCleaner(config)
  197. data_cleaner.start()
  198. perform_prestart_checks(expected_hostname)
  199. if not IS_WINDOWS:
  200. daemonize()
  201. # Starting ping port listener
  202. try:
  203. ping_port_listener = PingPortListener(config)
  204. except Exception as ex:
  205. err_message = "Failed to start ping port listener of: " + str(ex)
  206. logger.error(err_message)
  207. sys.stderr.write(err_message)
  208. sys.exit(1)
  209. ping_port_listener.start()
  210. update_log_level(config)
  211. server_hostname = config.get('server', 'hostname')
  212. server_url = config.get_api_url()
  213. try:
  214. server_ip = socket.gethostbyname(server_hostname)
  215. logger.info('Connecting to Ambari server at %s (%s)', server_url, server_ip)
  216. except socket.error:
  217. logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname)
  218. # Wait until server is reachable
  219. netutil = NetUtil(heartbeat_stop_callback)
  220. retries, connected = netutil.try_to_connect(server_url, -1, logger)
  221. # Ambari Agent was stopped using stop event
  222. if connected:
  223. # Launch Controller communication
  224. controller = Controller(config, heartbeat_stop_callback)
  225. controller.start()
  226. controller.join()
  227. if not IS_WINDOWS:
  228. stop_agent()
  229. logger.info("finished")
  230. if __name__ == "__main__":
  231. heartbeat_stop_callback = bind_signal_handlers(agentPid)
  232. main(heartbeat_stop_callback)