cluster.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. """
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import subprocess
  17. import datetime
  18. from config import Config
  19. from docker import Docker
  20. from vm import VM
  21. import os
  22. import time
  23. from log import Log
  24. from data import Data
  25. class Cluster:
  26. """
  27. The Cluster instance holds a list of VMs,
  28. it has the methods to request cluster, generate information and run all Ambari-agent and Ambari-server
  29. """
  30. # The constants represent the state of the cluster
  31. # A newly requested cluster is in FREE state
  32. STATE_FREE = "FREE"
  33. # A cluster with running Ambari-server and Ambari-agents is in RUNNING state
  34. STATE_RUNNING = "RUNNING"
  35. # A cluster is merged into another cluster and running, is in MERGE state
  36. # the name of the extended cluster is directly following the state String in JSON
  37. STATE_MERGE = "MERGE"
  38. def __init__(self):
  39. self.cluster_name = ""
  40. self.state = ""
  41. self.create_time = ""
  42. # The list should only has one or zero VM, which holds the Ambari-server
  43. self.ambari_server_vm = []
  44. # The list of VMs, with Ambari-agents directly inside (not in Docker)
  45. self.service_server_vm_list = []
  46. # The list of VMs, each will hold multiple Docker containers with Ambari-agent inside
  47. self.ambari_agent_vm_list = []
  48. def _get_int_interval(self, int_list):
  49. """
  50. get the interval of the integer list
  51. example: input[4,5,6,1,2,3], output [(1,3),(4,6)]
  52. example: input[4,5,6,100,2,3], output [(2,3),(4,6),(100,100)]
  53. :param int_list: the list of integer
  54. :return: a tuple, each tuple has 2 integer, representing one interval
  55. """
  56. interval_list = []
  57. int_list.sort()
  58. begin = None
  59. end = None
  60. for integer in int_list:
  61. if begin is None:
  62. begin = integer
  63. end = integer
  64. else:
  65. if integer == end + 1:
  66. end = integer
  67. else:
  68. interval_list.append((begin, end))
  69. begin = integer
  70. end = integer
  71. if begin is not None:
  72. interval_list.append((begin, end))
  73. return interval_list
  74. def print_description(self):
  75. print "cluster name: ", self.cluster_name
  76. print "create time: ", self.create_time
  77. print "state: ", self.state
  78. print
  79. print "Ambari Server: "
  80. ambari_server_vm = self.get_ambari_server_vm()
  81. if ambari_server_vm is None:
  82. print "None"
  83. else:
  84. print ambari_server_vm.domain_name, " ", ambari_server_vm.external_ip, " ",\
  85. ambari_server_vm.weave_internal_ip
  86. print
  87. print "Service Server with Ambari Agent directly installed: "
  88. if len(self.service_server_vm_list) == 0:
  89. print "None"
  90. for vm in self.service_server_vm_list:
  91. print vm.weave_domain_name, " ", vm.external_ip, " ", vm.weave_internal_ip
  92. print
  93. print "Ambari Agent in Docker Container: "
  94. int_list = []
  95. for vm in self.ambari_agent_vm_list:
  96. for docker in vm.docker_list:
  97. int_list.append(int(docker.get_index()))
  98. interval_list = self._get_int_interval(int_list)
  99. for interval in interval_list:
  100. interval_str = ""
  101. if interval[0] == interval[1]:
  102. interval_str = str(interval(0))
  103. else:
  104. interval_str = "[{0}-{1}]".format(interval[0], interval[1])
  105. print Docker.get_pattern_presentation(self.cluster_name, interval_str)
  106. print
  107. def get_agent_vm(self, vm_ip):
  108. """
  109. get the VM instance which holds Docker containers from the cluster instance
  110. :param vm_ip: the external IP of the target VM
  111. :return: the VM instance with the specified iP
  112. """
  113. for vm in self.ambari_agent_vm_list:
  114. if vm.external_ip == vm_ip:
  115. return vm
  116. def get_ambari_server_vm(self):
  117. """
  118. get the VM instance which hold the Ambari-server
  119. :return: the VM instance hold the Ambari-server, or None if no Ambari-server in this cluster
  120. """
  121. for vm in self.ambari_server_vm:
  122. return vm
  123. return None
  124. def get_service_server_vm(self, vm_ip):
  125. """
  126. get the VM instance which directly hold the Ambari-agent
  127. :param vm_ip: the external IP of the target VM
  128. :return:
  129. """
  130. for vm in self.service_server_vm_list:
  131. if vm.external_ip == vm_ip:
  132. return vm
  133. def to_json(self):
  134. """
  135. create a map to hold the information of the Cluster instance
  136. :return: A map, which is JSON format object.
  137. """
  138. cluster = {}
  139. cluster["cluster_name"] = self.cluster_name
  140. cluster["create_time"] = self.create_time
  141. cluster["state"] = self.state
  142. cluster["ambari_server_vm"] = []
  143. for vm in self.ambari_server_vm:
  144. cluster["ambari_server_vm"].append(vm.to_json())
  145. cluster["service_server_vm_list"] = []
  146. for vm in self.service_server_vm_list:
  147. cluster["service_server_vm_list"].append(vm.to_json())
  148. cluster["ambari_agent_vm_list"] = []
  149. for vm in self.ambari_agent_vm_list:
  150. cluster["ambari_agent_vm_list"].append(vm.to_json())
  151. return cluster
  152. @staticmethod
  153. def load_from_json(cluster_name):
  154. """
  155. load the cluster information from json file
  156. :param cluster_name: the name of the cluster
  157. :return: a Cluster instance or None if no such cluster
  158. """
  159. data = Data()
  160. json_data = data.read_cluster_json(cluster_name)
  161. if json_data is None:
  162. return None
  163. ambari_server_vm = []
  164. service_server_vm_list = []
  165. ambari_agent_vm_list = []
  166. for vm_json in json_data["ambari_server_vm"]:
  167. ambari_server_vm.append(VM.load_from_json(vm_json))
  168. for vm_json in json_data["service_server_vm_list"]:
  169. service_server_vm_list.append(VM.load_from_json(vm_json))
  170. for vm_json in json_data["ambari_agent_vm_list"]:
  171. ambari_agent_vm_list.append(VM.load_from_json(vm_json))
  172. cluster = Cluster()
  173. cluster.cluster_name = cluster_name
  174. cluster.state = json_data["state"]
  175. cluster.create_time = json_data["create_time"]
  176. cluster.ambari_server_vm = ambari_server_vm
  177. cluster.service_server_vm_list = service_server_vm_list
  178. cluster.ambari_agent_vm_list = ambari_agent_vm_list
  179. return cluster
  180. def _extract_vm_fqdn_ip(self, gce_info_file_name):
  181. """
  182. exatract domain name and IP address of VMs from the output file of GCE
  183. :param gce_info_file_name: output file of "GCE info" command
  184. :return: A list of tuple, each tuple has domain name and IP of a VM
  185. """
  186. lines = []
  187. with open(gce_info_file_name) as f:
  188. lines = f.readlines()
  189. vm_list = []
  190. # the first line in the output file is title
  191. for line in lines[1:]:
  192. tokens = line.split()
  193. fqdn_ip = (tokens[0], tokens[1])
  194. vm_list.append(fqdn_ip)
  195. return vm_list
  196. def request_vm(self, name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd):
  197. """
  198. Request VMs from GCE
  199. :param name: the name prefix of all requesting VMs
  200. :param vm_num: the number of VM
  201. :param gce_vm_type: the type of VM
  202. :param gce_vm_os: the OS of VM
  203. :param gce_extra_cmd: extra command for requesting the VMs
  204. :return: A list of tuple, each tuple has domain name and IP of a VM
  205. """
  206. gce_key = Config.ATTRIBUTES["gce_controller_key_file"]
  207. gce_login = "{0}@{1}".format(Config.ATTRIBUTES["gce_controller_user"], Config.ATTRIBUTES["gce_controller_ip"])
  208. gce_up_cmd = "gce up {0} {1} {2} {3} {4}".format(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd)
  209. subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_up_cmd])
  210. Log.write("cluster launched, wait for cluster info ... ...")
  211. fqdn_ip_pairs = []
  212. # wait for long enough. the more VM, more time it takes.
  213. for retry in range(max(6, vm_num)):
  214. time.sleep(10)
  215. # request cluster info
  216. with open(Config.ATTRIBUTES["gce_info_output"], "w") as gce_info_output_file:
  217. gce_info_cmd = "gce info {0}".format(name)
  218. subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_info_cmd],
  219. stdout=gce_info_output_file)
  220. fqdn_ip_pairs = self._extract_vm_fqdn_ip(Config.ATTRIBUTES["gce_info_output"])
  221. if len(fqdn_ip_pairs) == vm_num:
  222. Log.write("Get info for all ", str(len(fqdn_ip_pairs)), " VMs successfully")
  223. break
  224. Log.write("Only get info for ", str(len(fqdn_ip_pairs)), " VMs, retry ... ...")
  225. return fqdn_ip_pairs
  226. def request_ambari_server_vm(self, name):
  227. """
  228. request a VM for holding Ambari-server
  229. :param name: the name prefix of all requesting VMs
  230. :return: A list of tuple, each tuple has domain name and IP of a VM
  231. """
  232. # only 1 ambari server
  233. vm_num = 1
  234. gce_vm_type = Config.ATTRIBUTES["ambari_server_vm_type"]
  235. gce_vm_os = Config.ATTRIBUTES["ambari_server_vm_os"]
  236. gce_extra_cmd = ""
  237. if "ambari_server_vm_extra" in Config.ATTRIBUTES:
  238. gce_extra_cmd = Config.ATTRIBUTES["ambari_server_vm_extra"]
  239. fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd)
  240. return fqdn_ip_pairs
  241. def reqeust_service_server_vm(self, vm_num, name):
  242. """
  243. Request VMs to directly hold Ambari-agent (not inside Docker)
  244. :param vm_num: the number of VM to request
  245. :param name: the name prefix of all requesting VMs
  246. :return: A list of tuple, each tuple has domain name and IP of a VM
  247. """
  248. gce_vm_type = Config.ATTRIBUTES["service_server_vm_type"]
  249. gce_vm_os = Config.ATTRIBUTES["service_server_vm_os"]
  250. gce_extra_cmd = ""
  251. if "service_server_vm_extra" in Config.ATTRIBUTES:
  252. gce_extra_cmd = Config.ATTRIBUTES["service_server_vm_extra"]
  253. fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd)
  254. return fqdn_ip_pairs
  255. def request_agent_vm(self, vm_num, name):
  256. """
  257. Request VMs to hold Docker containers, each with Ambari-agent inside
  258. :param vm_num: the number of VM to request
  259. :param name: the name prefix of all requesting VMs
  260. :return: A list of tuple, each tuple has domain name and IP of a VM
  261. """
  262. gce_vm_type = Config.ATTRIBUTES["ambari_agent_vm_type"]
  263. gce_vm_os = Config.ATTRIBUTES["ambari_agent_vm_os"]
  264. gce_extra_disk = ""
  265. if "ambari_agent_vm_extra_disk" in Config.ATTRIBUTES:
  266. gce_extra_disk = Config.ATTRIBUTES["ambari_agent_vm_extra_disk"]
  267. fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_disk)
  268. return fqdn_ip_pairs
  269. def request_gce_cluster(self, ambari_agent_vm_num, docker_num,
  270. service_server_num, with_ambari_server, cluster_name):
  271. """
  272. Request a cluster from GCE
  273. :param ambari_agent_vm_num: number of VMs to hold Docker containers
  274. :param docker_num: number of Docker containers inside each VM
  275. :param service_server_num: number of VMs which has Ambari-agent directly installed (not in Docker)
  276. :param with_ambari_server: True or False, whether to request a VM to hold Ambari-server
  277. :param cluster_name: the name of the cluster
  278. :return: None
  279. """
  280. ambari_server_fqdn_ip_pairs = []
  281. if with_ambari_server is True:
  282. ambari_server_fqdn_ip_pairs = self.request_ambari_server_vm(VM.get_ambari_server_vm_name(cluster_name))
  283. service_server_fqdn_ip_pairs = self.reqeust_service_server_vm(service_server_num,
  284. VM.get_service_server_vm_name(cluster_name))
  285. ambari_agent_fqdn_ip_pairs = self.request_agent_vm(ambari_agent_vm_num,
  286. VM.get_ambari_agent_vm_name(cluster_name))
  287. # prepare all attributes of the cluster, write to a file
  288. self.generate_cluster_info(cluster_name, ambari_server_fqdn_ip_pairs, service_server_fqdn_ip_pairs,
  289. ambari_agent_fqdn_ip_pairs, docker_num)
  290. def generate_cluster_info(self, cluster_name, ambari_server_fqdn_ip_pairs, service_server_fqdn_ip_pairs,
  291. ambari_agent_fqdn_ip_pairs, docker_num):
  292. """
  293. generate VM and docker info for this cluster
  294. set up parameter of the class instance as this info
  295. :param cluster_name: the name of the cluster
  296. :param ambari_server_fqdn_ip_pairs: the domain name and IP pairs for Ambari-server
  297. :param service_server_fqdn_ip_pairs: the domain name and IP pairs for VMs with Ambari-agent installed
  298. :param ambari_agent_fqdn_ip_pairs: the domain name and IP pairs for VM with Docker containers
  299. :param docker_num: the number of Dockers inside each VMs
  300. :return: None
  301. """
  302. weave_ip_base = Config.ATTRIBUTES["weave_ip_base"]
  303. weave_ip_mask = Config.ATTRIBUTES["weave_ip_mask"]
  304. current_ip = weave_ip_base
  305. for vm_domain_name, vm_ip in ambari_server_fqdn_ip_pairs:
  306. current_ip = self._increase_ip(current_ip, 1)
  307. weave_dns_ip = current_ip
  308. vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask)
  309. current_ip = self._increase_ip(current_ip, 1)
  310. vm.weave_internal_ip = current_ip
  311. self.ambari_server_vm.append(vm)
  312. for vm_domain_name, vm_ip in service_server_fqdn_ip_pairs:
  313. current_ip = self._increase_ip(current_ip, 1)
  314. weave_dns_ip = current_ip
  315. vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask)
  316. current_ip = self._increase_ip(current_ip, 1)
  317. vm.weave_internal_ip = current_ip
  318. self.service_server_vm_list.append(vm)
  319. vm_index = 0
  320. for vm_domain_name, vm_ip in ambari_agent_fqdn_ip_pairs:
  321. current_ip = self._increase_ip(current_ip, 1)
  322. weave_dns_ip = current_ip
  323. vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask)
  324. for docker_index in range(0, docker_num):
  325. current_ip = self._increase_ip(current_ip, 1)
  326. docker_ip_str = current_ip
  327. total_docker_index = vm_index * docker_num + docker_index
  328. docker_domain_name = Docker.get_weave_domain_name(cluster_name, total_docker_index)
  329. docker = Docker(docker_ip_str, str(weave_ip_mask), docker_domain_name)
  330. vm.add_docker(docker)
  331. vm_index += 1
  332. self.ambari_agent_vm_list.append(vm)
  333. self.cluster_name = cluster_name
  334. self.create_time = str(datetime.datetime.now())
  335. self.state = Cluster.STATE_FREE
  336. # update config file.
  337. # This step makes the user avoid reconfiguring the IP for next cluster creation
  338. Config.update("weave", "weave_ip_base", current_ip)
  339. def _increase_ip(self, base_ip_str, increase):
  340. """
  341. increase the IP address.
  342. example: 192.168.1.1, increased by 1: 192.168.1.2
  343. example: 192.168.1.254, increased by 2: 192.168.2.1
  344. :param base_ip_str: the IP to be increased
  345. :param increase: the amount of increase
  346. :return: the new IP address, in String
  347. """
  348. base_ip = base_ip_str.split(".")
  349. new_ip = [int(base_ip[0]), int(base_ip[1]), int(base_ip[2]), int(base_ip[3])]
  350. new_ip[3] = new_ip[3] + increase
  351. for index in reversed(range(0, 4)):
  352. if new_ip[index] > 255:
  353. new_ip[index - 1] += (new_ip[index] / 256)
  354. new_ip[index] %= 256
  355. return "{0}.{1}.{2}.{3}".format(new_ip[0], new_ip[1], new_ip[2], new_ip[3])
  356. def _scp_upload(self, vm_external_ip):
  357. """
  358. upload all the code in a VM
  359. :param vm_external_ip: the external IP of the VM
  360. :return: None
  361. """
  362. # upload necessary file to VM
  363. vm_directory = "{0}@{1}:{2}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip,
  364. Config.ATTRIBUTES["vm_code_directory"])
  365. vm_key = Config.ATTRIBUTES["vm_key_file"]
  366. upload_return_code = 0
  367. with open(os.devnull, 'w') as shutup:
  368. upload_return_code = subprocess.call(["scp", "-o", "StrictHostKeyChecking=no", "-i",
  369. vm_key, "-r", ".", vm_directory],
  370. stdout=shutup, stderr=shutup)
  371. if upload_return_code == 0:
  372. Log.write("VM ", vm_external_ip, " file upload succeed")
  373. else:
  374. Log.write("VM ", vm_external_ip, " file upload fail")
  375. def _set_executable_permission(self, vm_external_ip):
  376. """
  377. Set all shell file to be executable
  378. :param vm_external_ip: the external IP of the VM
  379. :return: None
  380. """
  381. vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
  382. vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
  383. vm_ssh_chmod_cmd = "chmod a+x **/*.sh"
  384. vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_chmod_cmd)
  385. vm_key = Config.ATTRIBUTES["vm_key_file"]
  386. with open(os.devnull, 'w') as shutup:
  387. subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
  388. vm_ssh_login, vm_ssh_cmd],
  389. stdout=shutup, stderr=shutup)
  390. def run_cluster(self, server_weave_ip, server_external_ip):
  391. """
  392. Run all Ambari-agents and Ambari-server in the cluster in parallel
  393. Wait until all processes finish
  394. :param server_weave_ip: the Weave IP of Ambari-server
  395. :param server_external_ip: the external IP of Ambari-server
  396. :return: None
  397. """
  398. process_list = {}
  399. process_list.update(self.run_ambari_server_asyn())
  400. process_list.update(self.run_service_server_asyn(server_weave_ip, server_external_ip))
  401. process_list.update(self.run_docker_on_cluster_asyn(server_weave_ip, server_external_ip))
  402. terminate_state_list = {}
  403. for hostname in process_list:
  404. terminate_state_list[hostname] = False
  405. Log.write("Wait for all VMs to finish configuration ... ...")
  406. # Wait for all configuration subprocesses
  407. while True:
  408. all_finished = True
  409. for hostname in process_list:
  410. output_file, output_file_path, process = process_list[hostname]
  411. if terminate_state_list[hostname] is False:
  412. all_finished = False
  413. returncode = process.poll()
  414. if returncode is None:
  415. continue
  416. else:
  417. Log.write("VM ", hostname, " configuration completed, return code: ", str(returncode),
  418. ", output file path: ", output_file_path)
  419. terminate_state_list[hostname] = True
  420. output_file.close()
  421. else:
  422. pass
  423. if all_finished:
  424. break
  425. time.sleep(5)
  426. Log.write("All VM configuration completed.")
  427. def run_ambari_server_asyn(self):
  428. """
  429. Run configuration for Ambari-server in this cluster
  430. Set up Ambari-server and Weave network
  431. The method is NON-BLOCK
  432. :return: a map of tuple, the key of the map is the host name of the VM,
  433. the tuple has 3 elements: the file handler of the output of the VM,
  434. the file path of the output of the VM,
  435. and the process object of configuration for the VM
  436. """
  437. process_list = {}
  438. for vm in self.ambari_server_vm:
  439. vm_external_ip = vm.external_ip
  440. self._scp_upload(vm_external_ip)
  441. self._set_executable_permission(vm_external_ip)
  442. vm_output_file_path = vm.get_ssh_output_file_path()
  443. vm_output_file = open(vm_output_file_path, "w")
  444. # ssh install server
  445. vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
  446. vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
  447. vm_ssh_python_cmd = "python launcher_ambari_server.py {0}".format(self.cluster_name)
  448. vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd)
  449. vm_key = Config.ATTRIBUTES["vm_key_file"]
  450. Log.write(vm_ssh_python_cmd)
  451. process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
  452. vm_ssh_login, vm_ssh_cmd],
  453. stdout=vm_output_file, stderr=vm_output_file)
  454. process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process)
  455. Log.write("Configuring VM ", vm.hostname, " ... ...")
  456. return process_list
  457. def run_service_server_asyn(self, server_weave_ip, server_external_ip):
  458. """
  459. Run configuration, set up Ambari-agent in this VM, and the Weave network
  460. :param server_weave_ip: the Weave IP of the Ambari-server
  461. :param server_external_ip: the external IP of the Ambari-server
  462. The method is NON-BLOCK
  463. :return: a map of tuple, the key of the map is the host name of the VM,
  464. the tuple has 3 elements: the file handler of the output of the VM,
  465. the file path of the output of the VM,
  466. and the process object of configuration for the VM
  467. """
  468. process_list = {}
  469. for vm in self.service_server_vm_list:
  470. vm_external_ip = vm.external_ip
  471. self._scp_upload(vm_external_ip)
  472. self._set_executable_permission(vm_external_ip)
  473. vm_output_file_path = vm.get_ssh_output_file_path()
  474. vm_output_file = open(vm_output_file_path, "w")
  475. # ssh install server
  476. vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
  477. vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
  478. vm_ssh_python_cmd = "python launcher_service_server.py {0} {1} {2} {3}".format(
  479. vm_external_ip, server_weave_ip, server_external_ip, self.cluster_name)
  480. vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd)
  481. vm_key = Config.ATTRIBUTES["vm_key_file"]
  482. Log.write(vm_ssh_python_cmd)
  483. process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
  484. vm_ssh_login, vm_ssh_cmd],
  485. stdout=vm_output_file, stderr=vm_output_file)
  486. process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process)
  487. Log.write("Configuring VM ", vm.hostname, " ... ...")
  488. return process_list
  489. def run_docker_on_cluster_asyn(self, server_weave_ip, server_external_ip):
  490. """
  491. Run configuration, set up Docker and Weave network
  492. run all containers, each with Ambari-agent inside
  493. :param server_weave_ip: the Weave IP of the Ambari-server
  494. :param server_external_ip: the external IP of the Ambari-server
  495. The method is NON-BLOCK
  496. :return: a map of tuple, the key of the map is the host name of the VM,
  497. the tuple has 3 elements: the file handler of the output of the VM,
  498. the file path of the output of the VM,
  499. and the process object of configuration for the VM
  500. """
  501. process_list = {}
  502. for vm in self.ambari_agent_vm_list:
  503. vm_external_ip = vm.external_ip
  504. self._scp_upload(vm_external_ip)
  505. self._set_executable_permission(vm_external_ip)
  506. vm_output_file_path = vm.get_ssh_output_file_path()
  507. vm_output_file = open(vm_output_file_path, "w")
  508. vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
  509. vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
  510. vm_ssh_python_cmd = "python launcher_docker.py {0} {1} {2} {3}".format(
  511. vm_external_ip, server_weave_ip, server_external_ip, self.cluster_name)
  512. vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd)
  513. vm_key = Config.ATTRIBUTES["vm_key_file"]
  514. Log.write(vm_ssh_python_cmd)
  515. process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
  516. vm_ssh_login, vm_ssh_cmd],
  517. stdout=vm_output_file, stderr=vm_output_file)
  518. process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process)
  519. Log.write("Configuring VM ", vm.hostname, " ... ...")
  520. return process_list