bootstrap.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. #!/usr/bin/env ambari-python-wrap
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. # On Linux, the bootstrap process is supposed to run on hosts that may have installed Python 2.4 and above (CentOS 5).
  18. # Hence, the whole bootstrap code needs to comply with Python 2.4 instead of Python 2.6. Most notably, @-decorators and
  19. # {}-format() are to be avoided.
  20. import time
  21. import sys
  22. import logging
  23. import pprint
  24. import os
  25. import subprocess
  26. import threading
  27. import traceback
  28. import re
  29. from datetime import datetime
  30. from resource_management.core.shell import quote_bash_args
  31. AMBARI_PASSPHRASE_VAR_NAME = "AMBARI_PASSPHRASE"
  32. HOST_BOOTSTRAP_TIMEOUT = 300
  33. # how many parallel bootstraps may be run at a time
  34. MAX_PARALLEL_BOOTSTRAPS = 20
  35. # How many seconds to wait between polling parallel bootstraps
  36. POLL_INTERVAL_SEC = 1
  37. DEBUG = False
  38. DEFAULT_AGENT_TEMP_FOLDER = "/var/lib/ambari-agent/data/tmp"
  39. PYTHON_ENV="env PYTHONPATH=$PYTHONPATH:" + DEFAULT_AGENT_TEMP_FOLDER
  40. class HostLog:
  41. """ Provides per-host logging. """
  42. def __init__(self, log_file):
  43. self.log_file = log_file
  44. def write(self, log_text):
  45. """
  46. Writes log to file. Closes file after each write to make content accessible
  47. for poller in ambari-server
  48. """
  49. logFile = open(self.log_file, "a+")
  50. text = str(log_text)
  51. if not text.endswith("\n"):
  52. text += "\n"
  53. logFile.write(text)
  54. logFile.close()
  55. class SCP:
  56. """ SCP implementation that is thread based. The status can be returned using
  57. status val """
  58. def __init__(self, user, sshkey_file, host, inputFile, remote, bootdir, host_log):
  59. self.user = user
  60. self.sshkey_file = sshkey_file
  61. self.host = host
  62. self.inputFile = inputFile
  63. self.remote = remote
  64. self.bootdir = bootdir
  65. self.host_log = host_log
  66. pass
  67. def run(self):
  68. scpcommand = ["scp",
  69. "-r",
  70. "-o", "ConnectTimeout=60",
  71. "-o", "BatchMode=yes",
  72. "-o", "StrictHostKeyChecking=no",
  73. "-i", self.sshkey_file, self.inputFile, self.user + "@" +
  74. self.host + ":" + self.remote]
  75. if DEBUG:
  76. self.host_log.write("Running scp command " + ' '.join(scpcommand))
  77. self.host_log.write("==========================")
  78. self.host_log.write("\nCommand start time " + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  79. scpstat = subprocess.Popen(scpcommand, stdout=subprocess.PIPE,
  80. stderr=subprocess.PIPE)
  81. log = scpstat.communicate()
  82. errorMsg = log[1]
  83. log = log[0] + "\n" + log[1]
  84. self.host_log.write(log)
  85. self.host_log.write("scp " + self.inputFile)
  86. self.host_log.write("host=" + self.host + ", exitcode=" + str(scpstat.returncode) )
  87. self.host_log.write("Command end time " + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  88. return {"exitstatus": scpstat.returncode, "log": log, "errormsg": errorMsg}
  89. class SSH:
  90. """ Ssh implementation of this """
  91. def __init__(self, user, sshkey_file, host, command, bootdir, host_log, errorMessage = None):
  92. self.user = user
  93. self.sshkey_file = sshkey_file
  94. self.host = host
  95. self.command = command
  96. self.bootdir = bootdir
  97. self.errorMessage = errorMessage
  98. self.host_log = host_log
  99. pass
  100. def run(self):
  101. sshcommand = ["ssh",
  102. "-o", "ConnectTimeOut=60",
  103. "-o", "StrictHostKeyChecking=no",
  104. "-o", "BatchMode=yes",
  105. "-tt", # Should prevent "tput: No value for $TERM and no -T specified" warning
  106. "-i", self.sshkey_file,
  107. self.user + "@" + self.host, self.command]
  108. if DEBUG:
  109. self.host_log.write("Running ssh command " + ' '.join(sshcommand))
  110. self.host_log.write("==========================")
  111. self.host_log.write("\nCommand start time " + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  112. sshstat = subprocess.Popen(sshcommand, stdout=subprocess.PIPE,
  113. stderr=subprocess.PIPE)
  114. log = sshstat.communicate()
  115. errorMsg = log[1]
  116. if self.errorMessage and sshstat.returncode != 0:
  117. errorMsg = self.errorMessage + "\n" + errorMsg
  118. log = log[0] + "\n" + errorMsg
  119. self.host_log.write(log)
  120. self.host_log.write("SSH command execution finished")
  121. self.host_log.write("host=" + self.host + ", exitcode=" + str(sshstat.returncode))
  122. self.host_log.write("Command end time " + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  123. return {"exitstatus": sshstat.returncode, "log": log, "errormsg": errorMsg}
  124. class Bootstrap(threading.Thread):
  125. """ Bootstrap the agent on a separate host"""
  126. TEMP_FOLDER = DEFAULT_AGENT_TEMP_FOLDER
  127. OS_CHECK_SCRIPT_FILENAME = "os_check_type.py"
  128. AMBARI_REPO_FILENAME = "ambari"
  129. SETUP_SCRIPT_FILENAME = "setupAgent.py"
  130. PASSWORD_FILENAME = "host_pass"
  131. ambari_commons="/usr/lib/python2.6/site-packages/ambari_commons"
  132. def __init__(self, host, shared_state):
  133. threading.Thread.__init__(self)
  134. self.host = host
  135. self.shared_state = shared_state
  136. self.status = {
  137. "start_time": None,
  138. "return_code": None,
  139. }
  140. log_file = os.path.join(self.shared_state.bootdir, self.host + ".log")
  141. self.host_log = HostLog(log_file)
  142. self.daemon = True
  143. if self.is_ubuntu():
  144. self.AMBARI_REPO_FILENAME = self.AMBARI_REPO_FILENAME + ".list"
  145. else:
  146. self.AMBARI_REPO_FILENAME = self.AMBARI_REPO_FILENAME + ".repo"
  147. def getRemoteName(self, filename):
  148. full_name = os.path.join(self.TEMP_FOLDER, filename)
  149. remote_files = self.shared_state.remote_files
  150. if not remote_files.has_key(full_name):
  151. remote_files[full_name] = self.generateRandomFileName(full_name)
  152. return remote_files[full_name]
  153. def generateRandomFileName(self, filename):
  154. if filename is None:
  155. return self.getUtime()
  156. else:
  157. name, ext = os.path.splitext(filename)
  158. return str(name) + str(self.getUtime()) + str(ext)
  159. # This method is needed to implement the descriptor protocol (make object
  160. # to pass self reference to mockups)
  161. def __get__(self, obj, objtype):
  162. def _call(*args, **kwargs):
  163. self(obj, *args, **kwargs)
  164. return _call
  165. def is_suse(self):
  166. if os.path.isfile("/etc/issue"):
  167. if "suse" in open("/etc/issue").read().lower():
  168. return True
  169. return False
  170. def is_ubuntu(self):
  171. if self.getServerFamily()[0] == "ubuntu":
  172. return True
  173. return False
  174. def getRepoDir(self):
  175. """ Ambari repo file for Ambari."""
  176. if self.is_suse():
  177. return "/etc/zypp/repos.d"
  178. elif self.is_ubuntu():
  179. return "/etc/apt/sources.list.d"
  180. else:
  181. return "/etc/yum.repos.d"
  182. def getRepoFile(self):
  183. """ Ambari repo file for Ambari."""
  184. return os.path.join(self.getRepoDir(), self.AMBARI_REPO_FILENAME)
  185. def getOsCheckScript(self):
  186. return os.path.join(self.shared_state.script_dir, self.OS_CHECK_SCRIPT_FILENAME)
  187. def getOsCheckScriptRemoteLocation(self):
  188. return self.getRemoteName(self.OS_CHECK_SCRIPT_FILENAME)
  189. def getCommonFunctionsRemoteLocation(self):
  190. return self.TEMP_FOLDER;
  191. def getUtime(self):
  192. return int(time.time())
  193. def getPasswordFile(self):
  194. return self.getRemoteName(self.PASSWORD_FILENAME)
  195. def hasPassword(self):
  196. password_file = self.shared_state.password_file
  197. return password_file is not None and password_file != 'null'
  198. def createTargetDir(self):
  199. # Creating target dir
  200. self.host_log.write("==========================\n")
  201. self.host_log.write("Creating target directory...")
  202. params = self.shared_state
  203. user = params.user
  204. command = "sudo mkdir -p {0} ; sudo chown -R {1} {0}".format(self.TEMP_FOLDER,quote_bash_args(params.user))
  205. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  206. params.bootdir, self.host_log)
  207. retcode = ssh.run()
  208. self.host_log.write("\n")
  209. return retcode
  210. def copyOsCheckScript(self):
  211. # Copying the os check script file
  212. fileToCopy = self.getOsCheckScript()
  213. target = self.getOsCheckScriptRemoteLocation()
  214. params = self.shared_state
  215. self.host_log.write("==========================\n")
  216. self.host_log.write("Copying OS type check script...")
  217. scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy,
  218. target, params.bootdir, self.host_log)
  219. result = scp.run()
  220. self.host_log.write("\n")
  221. return result
  222. def copyCommonFunctions(self):
  223. # Copying the os check script file
  224. fileToCopy = self.ambari_commons
  225. target = self.getCommonFunctionsRemoteLocation()
  226. params = self.shared_state
  227. self.host_log.write("==========================\n")
  228. self.host_log.write("Copying common functions script...")
  229. scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy,
  230. target, params.bootdir, self.host_log)
  231. result = scp.run()
  232. self.host_log.write("\n")
  233. return result
  234. def getMoveRepoFileWithPasswordCommand(self, targetDir):
  235. return "sudo -S mv " + str(self.getRemoteName(self.AMBARI_REPO_FILENAME)) \
  236. + " " + os.path.join(str(targetDir), self.AMBARI_REPO_FILENAME) + \
  237. " < " + str(self.getPasswordFile())
  238. def getMoveRepoFileWithoutPasswordCommand(self, targetDir):
  239. return "sudo mv " + str(self.getRemoteName(self.AMBARI_REPO_FILENAME)) \
  240. + " " + os.path.join(str(targetDir), self.AMBARI_REPO_FILENAME)
  241. def getMoveRepoFileCommand(self, targetDir):
  242. if self.hasPassword():
  243. return self.getMoveRepoFileWithPasswordCommand(targetDir)
  244. else:
  245. return self.getMoveRepoFileWithoutPasswordCommand(targetDir)
  246. def getAptUpdateCommand(self):
  247. return "sudo apt-get update -o Dir::Etc::sourcelist=\"%s/%s\" -o API::Get::List-Cleanup=\"0\" --no-list-cleanup" %\
  248. ("sources.list.d", self.AMBARI_REPO_FILENAME)
  249. def copyNeededFiles(self):
  250. # Copying the files
  251. fileToCopy = self.getRepoFile()
  252. target = self.getRemoteName(self.AMBARI_REPO_FILENAME)
  253. self.host_log.write("==========================\n")
  254. self.host_log.write("Copying repo file to 'tmp' folder...")
  255. params = self.shared_state
  256. scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy,
  257. target, params.bootdir, self.host_log)
  258. retcode1 = scp.run()
  259. self.host_log.write("\n")
  260. # Move file to repo dir
  261. self.host_log.write("==========================\n")
  262. self.host_log.write("Moving file to repo dir...")
  263. targetDir = self.getRepoDir()
  264. command = self.getMoveRepoFileCommand(targetDir)
  265. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  266. params.bootdir, self.host_log)
  267. retcode2 = ssh.run()
  268. self.host_log.write("\n")
  269. # Update repo cache for ubuntu OS
  270. if self.is_ubuntu():
  271. self.host_log.write("==========================\n")
  272. self.host_log.write("Update apt cache of repository...")
  273. command = self.getAptUpdateCommand()
  274. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  275. params.bootdir, self.host_log)
  276. retcode2 = ssh.run()
  277. self.host_log.write("\n")
  278. self.host_log.write("==========================\n")
  279. self.host_log.write("Copying setup script file...")
  280. fileToCopy = params.setup_agent_file
  281. target = self.getRemoteName(self.SETUP_SCRIPT_FILENAME)
  282. scp = SCP(params.user, params.sshkey_file, self.host, fileToCopy,
  283. target, params.bootdir, self.host_log)
  284. retcode3 = scp.run()
  285. self.host_log.write("\n")
  286. return max(retcode1["exitstatus"], retcode2["exitstatus"], retcode3["exitstatus"])
  287. def getAmbariVersion(self):
  288. ambari_version = self.shared_state.ambari_version
  289. if ambari_version is None or ambari_version == "null":
  290. return ""
  291. else:
  292. return ambari_version
  293. def getAmbariPort(self):
  294. server_port = self.shared_state.server_port
  295. if server_port is None or server_port == "null":
  296. return "null"
  297. else:
  298. return server_port
  299. def getRunSetupWithPasswordCommand(self, expected_hostname):
  300. setupFile = self.getRemoteName(self.SETUP_SCRIPT_FILENAME)
  301. passphrase = os.environ[AMBARI_PASSPHRASE_VAR_NAME]
  302. server = self.shared_state.ambari_server
  303. user_run_as = self.shared_state.user_run_as
  304. version = self.getAmbariVersion()
  305. port = self.getAmbariPort()
  306. passwordFile = self.getPasswordFile()
  307. return "sudo -S python " + str(setupFile) + " " + str(expected_hostname) + \
  308. " " + str(passphrase) + " " + str(server)+ " " + quote_bash_args(str(user_run_as)) + " " + str(version) + \
  309. " " + str(port) + " < " + str(passwordFile)
  310. def getRunSetupWithoutPasswordCommand(self, expected_hostname):
  311. setupFile=self.getRemoteName(self.SETUP_SCRIPT_FILENAME)
  312. passphrase=os.environ[AMBARI_PASSPHRASE_VAR_NAME]
  313. server=self.shared_state.ambari_server
  314. user_run_as = self.shared_state.user_run_as
  315. version=self.getAmbariVersion()
  316. port=self.getAmbariPort()
  317. return "sudo python " + str(setupFile) + " " + str(expected_hostname) + \
  318. " " + str(passphrase) + " " + str(server)+ " " + quote_bash_args(str(user_run_as)) + " " + str(version) + \
  319. " " + str(port)
  320. def getRunSetupCommand(self, expected_hostname):
  321. if self.hasPassword():
  322. return self.getRunSetupWithPasswordCommand(expected_hostname)
  323. else:
  324. return self.getRunSetupWithoutPasswordCommand(expected_hostname)
  325. def runOsCheckScript(self):
  326. params = self.shared_state
  327. self.host_log.write("==========================\n")
  328. self.host_log.write("Running OS type check...")
  329. command = "chmod a+x %s && %s %s %s" % \
  330. (self.getOsCheckScriptRemoteLocation(),
  331. PYTHON_ENV, self.getOsCheckScriptRemoteLocation(), params.cluster_os_type)
  332. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  333. params.bootdir, self.host_log)
  334. retcode = ssh.run()
  335. self.host_log.write("\n")
  336. return retcode
  337. def runSetupAgent(self):
  338. params = self.shared_state
  339. self.host_log.write("==========================\n")
  340. self.host_log.write("Running setup agent script...")
  341. command = self.getRunSetupCommand(self.host)
  342. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  343. params.bootdir, self.host_log)
  344. retcode = ssh.run()
  345. self.host_log.write("\n")
  346. return retcode
  347. def createDoneFile(self, retcode):
  348. """ Creates .done file for current host. These files are later read from Java code.
  349. If .done file for any host is not created, the bootstrap will hang or fail due to timeout"""
  350. params = self.shared_state
  351. doneFilePath = os.path.join(params.bootdir, self.host + ".done")
  352. if not os.path.exists(doneFilePath):
  353. doneFile = open(doneFilePath, "w+")
  354. doneFile.write(str(retcode))
  355. doneFile.close()
  356. def getServerFamily(self):
  357. '''Return server OS family and version'''
  358. cot = re.search("([^\d]+)([\d]*)", self.shared_state.cluster_os_type)
  359. return cot.group(1).lower(),cot.group(2).lower()
  360. def checkSudoPackage(self):
  361. """ Checking 'sudo' package on remote host """
  362. self.host_log.write("==========================\n")
  363. self.host_log.write("Checking 'sudo' package on remote host...")
  364. params = self.shared_state
  365. if self.getServerFamily()[0] == "ubuntu":
  366. command = "dpkg --get-selections|grep -e '^sudo\s*install'"
  367. else:
  368. command = "rpm -qa | grep -e '^sudo\-'"
  369. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  370. params.bootdir, self.host_log,
  371. errorMessage="Error: Sudo command is not available. "
  372. "Please install the sudo command.")
  373. retcode = ssh.run()
  374. self.host_log.write("\n")
  375. return retcode
  376. def copyPasswordFile(self):
  377. # Copy the password file
  378. self.host_log.write("Copying password file to 'tmp' folder...")
  379. params = self.shared_state
  380. scp = SCP(params.user, params.sshkey_file, self.host, params.password_file,
  381. self.getPasswordFile(), params.bootdir, self.host_log)
  382. retcode1 = scp.run()
  383. self.copied_password_file = True
  384. # Change password file mode to 600
  385. self.host_log.write("Changing password file mode...")
  386. command = "chmod 600 " + self.getPasswordFile()
  387. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  388. params.bootdir, self.host_log)
  389. retcode2 = ssh.run()
  390. self.host_log.write("Copying password file finished")
  391. return max(retcode1["exitstatus"], retcode2["exitstatus"])
  392. def changePasswordFileModeOnHost(self):
  393. # Change password file mode to 600
  394. self.host_log.write("Changing password file mode...")
  395. params = self.shared_state
  396. command = "chmod 600 " + self.getPasswordFile()
  397. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  398. params.bootdir, self.host_log)
  399. retcode = ssh.run()
  400. self.host_log.write("Change password file mode on host finished")
  401. return retcode
  402. def deletePasswordFile(self):
  403. # Deleting the password file
  404. self.host_log.write("Deleting password file...")
  405. params = self.shared_state
  406. command = "rm " + self.getPasswordFile()
  407. ssh = SSH(params.user, params.sshkey_file, self.host, command,
  408. params.bootdir, self.host_log)
  409. retcode = ssh.run()
  410. self.host_log.write("Deleting password file finished")
  411. return retcode
  412. def try_to_execute(self, action):
  413. last_retcode = {"exitstatus": 177, "log":"Try to execute '{0}'".format(str(action)), "errormsg":"Execute of '{0}' failed".format(str(action))}
  414. try:
  415. retcode = action()
  416. if isinstance(retcode, int):
  417. last_retcode["exitstatus"] = retcode
  418. else:
  419. last_retcode = retcode
  420. except Exception:
  421. self.host_log.write("Traceback: " + traceback.format_exc())
  422. return last_retcode
  423. def run(self):
  424. """ Copy files and run commands on remote host """
  425. self.status["start_time"] = time.time()
  426. # Population of action queue
  427. action_queue = [self.createTargetDir,
  428. self.copyCommonFunctions,
  429. self.copyOsCheckScript,
  430. self.runOsCheckScript,
  431. self.checkSudoPackage
  432. ]
  433. if self.hasPassword():
  434. action_queue.extend([self.copyPasswordFile,
  435. self.changePasswordFileModeOnHost])
  436. action_queue.extend([
  437. self.copyNeededFiles,
  438. self.runSetupAgent,
  439. ])
  440. # Execution of action queue
  441. last_retcode = 0
  442. while action_queue and last_retcode == 0:
  443. action = action_queue.pop(0)
  444. ret = self.try_to_execute(action)
  445. last_retcode = ret["exitstatus"]
  446. err_msg = ret["errormsg"]
  447. std_out = ret["log"]
  448. # Checking execution result
  449. if last_retcode != 0:
  450. message = "ERROR: Bootstrap of host {0} fails because previous action " \
  451. "finished with non-zero exit code ({1})\nERROR MESSAGE: {2}\nSTDOUT: {3}".format(self.host, last_retcode, err_msg, std_out)
  452. self.host_log.write(message)
  453. logging.error(message)
  454. # Try to delete password file
  455. if self.hasPassword() and self.copied_password_file:
  456. retcode = self.try_to_execute(self.deletePasswordFile)
  457. if retcode["exitstatus"] != 0:
  458. message = "WARNING: failed to delete password file " \
  459. "at {0}. Please delete it manually".format(self.getPasswordFile())
  460. self.host_log.write(message)
  461. logging.warn(message)
  462. self.createDoneFile(last_retcode)
  463. self.status["return_code"] = last_retcode
  464. def getStatus(self):
  465. return self.status
  466. def interruptBootstrap(self):
  467. """
  468. Thread is not really interrupted (moreover, Python seems to have no any
  469. stable/portable/official api to do that: _Thread__stop only marks thread
  470. as stopped). The bootstrap thread is marked as a daemon at init, and will
  471. exit when the main parallel bootstrap thread exits.
  472. All we need to do now is a proper logging and creating .done file
  473. """
  474. self.host_log.write("Automatic Agent registration timed out (timeout = {0} seconds). " \
  475. "Check your network connectivity and retry registration," \
  476. " or use manual agent registration.".format(HOST_BOOTSTRAP_TIMEOUT))
  477. self.createDoneFile(199)
  478. class PBootstrap:
  479. """ BootStrapping the agents on a list of hosts"""
  480. def __init__(self, hosts, sharedState):
  481. self.hostlist = hosts
  482. self.sharedState = sharedState
  483. pass
  484. def run_bootstrap(self, host):
  485. bootstrap = Bootstrap(host, self.sharedState)
  486. bootstrap.start()
  487. return bootstrap
  488. def run(self):
  489. """ Run up to MAX_PARALLEL_BOOTSTRAPS at a time in parallel """
  490. logging.info("Executing parallel bootstrap")
  491. queue = list(self.hostlist)
  492. queue.reverse()
  493. running_list = []
  494. finished_list = []
  495. while queue or running_list: # until queue is not empty or not all parallel bootstraps are
  496. # poll running bootstraps
  497. for bootstrap in running_list:
  498. if bootstrap.getStatus()["return_code"] is not None:
  499. finished_list.append(bootstrap)
  500. else:
  501. starttime = bootstrap.getStatus()["start_time"]
  502. elapsedtime = time.time() - starttime
  503. if elapsedtime > HOST_BOOTSTRAP_TIMEOUT:
  504. # bootstrap timed out
  505. logging.warn("Bootstrap at host {0} timed out and will be "
  506. "interrupted".format(bootstrap.host))
  507. bootstrap.interruptBootstrap()
  508. finished_list.append(bootstrap)
  509. # Remove finished from the running list
  510. running_list[:] = [b for b in running_list if not b in finished_list]
  511. # Start new bootstraps from the queue
  512. free_slots = MAX_PARALLEL_BOOTSTRAPS - len(running_list)
  513. for i in range(free_slots):
  514. if queue:
  515. next_host = queue.pop()
  516. bootstrap = self.run_bootstrap(next_host)
  517. running_list.append(bootstrap)
  518. time.sleep(POLL_INTERVAL_SEC)
  519. logging.info("Finished parallel bootstrap")
  520. class SharedState:
  521. def __init__(self, user, sshkey_file, script_dir, boottmpdir, setup_agent_file,
  522. ambari_server, cluster_os_type, ambari_version, server_port,
  523. user_run_as, password_file = None):
  524. self.hostlist_to_remove_password_file = None
  525. self.user = user
  526. self.sshkey_file = sshkey_file
  527. self.bootdir = boottmpdir
  528. self.script_dir = script_dir
  529. self.setup_agent_file = setup_agent_file
  530. self.ambari_server = ambari_server
  531. self.cluster_os_type = cluster_os_type
  532. self.ambari_version = ambari_version
  533. self.user_run_as = user_run_as
  534. self.password_file = password_file
  535. self.statuses = None
  536. self.server_port = server_port
  537. self.remote_files = {}
  538. self.ret = {}
  539. pass
  540. def main(argv=None):
  541. scriptDir = os.path.realpath(os.path.dirname(argv[0]))
  542. onlyargs = argv[1:]
  543. if len(onlyargs) < 3:
  544. sys.stderr.write("Usage: <comma separated hosts> "
  545. "<tmpdir for storage> <user> <sshkey_file> <agent setup script>"
  546. " <ambari-server name> <cluster os type> <ambari version> <ambari port> <user_run_as> <passwordFile>\n")
  547. sys.exit(2)
  548. pass
  549. #Parse the input
  550. hostList = onlyargs[0].split(",")
  551. bootdir = onlyargs[1]
  552. user = onlyargs[2]
  553. sshkey_file = onlyargs[3]
  554. setupAgentFile = onlyargs[4]
  555. ambariServer = onlyargs[5]
  556. cluster_os_type = onlyargs[6]
  557. ambariVersion = onlyargs[7]
  558. server_port = onlyargs[8]
  559. user_run_as = onlyargs[9]
  560. passwordFile = onlyargs[10]
  561. # ssh doesn't like open files
  562. subprocess.Popen(["chmod", "600", sshkey_file], stdout=subprocess.PIPE)
  563. if passwordFile is not None and passwordFile != 'null':
  564. subprocess.Popen(["chmod", "600", passwordFile], stdout=subprocess.PIPE)
  565. logging.info("BootStrapping hosts " + pprint.pformat(hostList) +
  566. " using " + scriptDir + " cluster primary OS: " + cluster_os_type +
  567. " with user '" + user + "' sshKey File " + sshkey_file + " password File " + passwordFile +\
  568. " using tmp dir " + bootdir + " ambari: " + ambariServer +"; server_port: " + server_port +\
  569. "; ambari version: " + ambariVersion+"; user_run_as: " + user_run_as)
  570. sharedState = SharedState(user, sshkey_file, scriptDir, bootdir, setupAgentFile,
  571. ambariServer, cluster_os_type, ambariVersion,
  572. server_port, user_run_as, passwordFile)
  573. pbootstrap = PBootstrap(hostList, sharedState)
  574. pbootstrap.run()
  575. return 0 # Hack to comply with current usage
  576. if __name__ == '__main__':
  577. logging.basicConfig(level=logging.DEBUG)
  578. main(sys.argv)