cluster.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. '''
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. '''
  16. import subprocess
  17. import time
  18. from config import Config
  19. from docker import Docker
  20. from vm import VM
  21. class Cluster:
  22. def __init__(self):
  23. self.cluster_name = ""
  24. self.VMs_num = 0
  25. self.VM_list = []
  26. # read cluster info from a file
  27. def load_cluster_info(self, filename):
  28. file = open(filename)
  29. self.cluster_name = file.next().split()[1]
  30. self.VMs_num = int(file.next().split()[1])
  31. for VM_index in range(0, self.VMs_num):
  32. vm = VM(file.next().split()[1])
  33. docker_num = int(file.next().split()[1])
  34. for Docker_index in range(0, docker_num):
  35. line = file.next()
  36. IP = line.split()[0].split("/")[0]
  37. mask = line.split()[0].split("/")[1]
  38. hostname = line.split()[1]
  39. docker = Docker(IP, mask, hostname)
  40. vm.add_docker(docker)
  41. self.VM_list.append(vm)
  42. file.close()
  43. def __extract_VM_IP__(self, GCE_info_file_name):
  44. f = open(GCE_info_file_name)
  45. lines = f.readlines()
  46. f.close()
  47. ip_list = []
  48. for line in lines:
  49. tokens = line.split()
  50. ip_list.append(tokens[1])
  51. return ip_list[1:]
  52. # request a new cluster
  53. def request_GCE_cluster(self, vms_num, docker_num, cluster_name):
  54. # reload configuration file
  55. config = Config()
  56. config.load()
  57. # request cluster
  58. gce_key = config.ATTRIBUTES["GCE_controller_key_file"]
  59. gce_login = config.ATTRIBUTES["GCE_controller_user"] + "@" + config.ATTRIBUTES["GCE_controller_IP"]
  60. gce_up_cmd = "gce up " + cluster_name + " " + str(vms_num) + " " + config.ATTRIBUTES["GCE_VM_type"] + \
  61. " " + config.ATTRIBUTES["GCE_VM_OS"]
  62. subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_up_cmd])
  63. print "cluster launched successufully, wait 5 seconds for cluster info ... ..."
  64. time.sleep(5)
  65. # request cluster info
  66. gce_info_output_file = open(config.ATTRIBUTES["GCE_info_output"], "w")
  67. gce_info_cmd = "gce info " + cluster_name
  68. subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_info_cmd], \
  69. stdout=gce_info_output_file)
  70. gce_info_output_file.close()
  71. print "cluster info is saved to file " + config.ATTRIBUTES["GCE_info_output"]
  72. # prepare all attributes of the cluster, write to a file
  73. VM_IP_list = self.__extract_VM_IP__(config.ATTRIBUTES["GCE_info_output"])
  74. self.generate_cluster_info(VM_IP_list, cluster_name, docker_num)
  75. self.overwrite_to_file(config.ATTRIBUTES["cluster_info_file"])
  76. # server need this file to resolve the host names of the agents
  77. self.export_hostnames(config.ATTRIBUTES["Docker_hostname_info"])
  78. # save info to file
  79. def overwrite_to_file(self, filename):
  80. file = open(filename, "w")
  81. file.write("cluster_name: " + self.cluster_name + "\n")
  82. file.write("VMs_num: " + str(self.VMs_num) + "\n")
  83. for vm in self.VM_list:
  84. file.write("\t\t")
  85. file.write("VM_IP: " + vm.external_ip + "\n")
  86. file.write("\t\t")
  87. file.write("Docker_num: " + str(len(vm.docker_list)) + "\n")
  88. for docker in vm.docker_list:
  89. file.write("\t\t\t\t")
  90. file.write(docker.IP + "/" + docker.mask + " " + docker.hostname + "\n")
  91. file.close()
  92. def __increase_IP__(self, base_IP, increase):
  93. IP = [int(base_IP[0]), int(base_IP[1]), int(base_IP[2]), int(base_IP[3])]
  94. IP[3] = IP[3] + increase
  95. for index in reversed(range(0, 4)):
  96. if IP[index] > 255:
  97. IP[index - 1] = IP[index - 1] + IP[index] / 256
  98. IP[index] = IP[index] % 256
  99. return IP
  100. # generate VM and docker info for this cluster
  101. # set up parameter as this info
  102. def generate_cluster_info(self, VM_IP_list, cluster_name, docker_num):
  103. config = Config()
  104. config.load()
  105. Docker_IP_base = config.ATTRIBUTES["Docker_IP_base"].split(".")
  106. Docker_IP_mask = config.ATTRIBUTES["Docker_IP_mask"]
  107. VM_index = 0
  108. for VM_IP in VM_IP_list:
  109. vm = VM(VM_IP)
  110. for Docker_index in range(0, docker_num):
  111. total_Docker_index = VM_index * docker_num + Docker_index
  112. docker_IP = self.__increase_IP__(Docker_IP_base, total_Docker_index)
  113. docker_IP_str = str(docker_IP[0]) + "." + str(docker_IP[1]) + "." + \
  114. str(docker_IP[2]) + "." + str(docker_IP[3])
  115. docker_hostname = cluster_name + "-" + str(VM_index) + "-" + str(Docker_index)
  116. docker = Docker(docker_IP_str, str(Docker_IP_mask), docker_hostname)
  117. # print docker
  118. vm.add_docker(docker)
  119. VM_index = VM_index + 1
  120. self.VM_list.append(vm)
  121. self.VMs_num = len(VM_IP_list)
  122. self.cluster_name = cluster_name
  123. # run all dockers for all the VMs in the cluster
  124. # upload necessary file to each machine in cluster, run launcher_docker.py in each machine with parameter
  125. def run_docker_on_cluster(self, server_external_IP, server_Weave_IP):
  126. config = Config()
  127. config.load()
  128. for vm in self.VM_list:
  129. # upload necessary file to each machine in cluster
  130. VM_external_IP = vm.external_ip
  131. VM_directory = "root@" + VM_external_IP + ":" + config.ATTRIBUTES["VM_code_directory"]
  132. VM_key = config.ATTRIBUTES["GCE_VM_key_file"]
  133. subprocess.call(["scp", "-o", "StrictHostKeyChecking=no", "-i", VM_key, "-r", ".", VM_directory])
  134. # run launcher_docker.py in each machine with parameters
  135. subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", VM_key, \
  136. "root@" + VM_external_IP, \
  137. "cd " + config.ATTRIBUTES["VM_code_directory"] + "; python launcher_docker.py" + \
  138. " " + VM_external_IP + " " + server_Weave_IP + " " + server_external_IP])
  139. # export host names to a file
  140. def export_hostnames(self, filename):
  141. hostname_file = open(filename, "w")
  142. for vm in self.VM_list:
  143. for docker in vm.docker_list:
  144. hostname_file.write(docker.IP)
  145. hostname_file.write(" ")
  146. hostname_file.write(docker.hostname)
  147. hostname_file.write("\n")
  148. hostname_file.close()