|
@@ -1,4 +1,4 @@
|
|
|
-'''
|
|
|
+"""
|
|
|
Licensed to the Apache Software Foundation (ASF) under one
|
|
|
or more contributor license agreements. See the NOTICE file
|
|
|
distributed with this work for additional information
|
|
@@ -14,162 +14,572 @@ distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
See the License for the specific language governing permissions and
|
|
|
limitations under the License.
|
|
|
-'''
|
|
|
+"""
|
|
|
|
|
|
import subprocess
|
|
|
-import time
|
|
|
+import datetime
|
|
|
from config import Config
|
|
|
from docker import Docker
|
|
|
from vm import VM
|
|
|
-
|
|
|
+import os
|
|
|
+import time
|
|
|
+from log import Log
|
|
|
+from data import Data
|
|
|
|
|
|
|
|
|
class Cluster:
|
|
|
+ """
|
|
|
+ The Cluster instance holds a list of VMs,
|
|
|
+ it has the methods to request cluster, generate information and run all Ambari-agent and Ambari-server
|
|
|
+ """
|
|
|
+
|
|
|
+ # The constants represent the state of the cluster
|
|
|
+ # A newly requested cluster is in FREE state
|
|
|
+ STATE_FREE = "FREE"
|
|
|
+ # A cluster with running Ambari-server and Ambari-agents is in RUNNING state
|
|
|
+ STATE_RUNNING = "RUNNING"
|
|
|
+ # A cluster is merged into another cluster and running, is in MERGE state
|
|
|
+ # the name of the extended cluster is directly following the state String in JSON
|
|
|
+ STATE_MERGE = "MERGE"
|
|
|
+
|
|
|
def __init__(self):
|
|
|
self.cluster_name = ""
|
|
|
- self.VMs_num = 0
|
|
|
- self.VM_list = []
|
|
|
-
|
|
|
- # read cluster info from a file
|
|
|
- def load_cluster_info(self, filename):
|
|
|
- file = open(filename)
|
|
|
-
|
|
|
- self.cluster_name = file.next().split()[1]
|
|
|
- self.VMs_num = int(file.next().split()[1])
|
|
|
- for VM_index in range(0, self.VMs_num):
|
|
|
- vm = VM(file.next().split()[1])
|
|
|
- docker_num = int(file.next().split()[1])
|
|
|
- for Docker_index in range(0, docker_num):
|
|
|
- line = file.next()
|
|
|
- IP = line.split()[0].split("/")[0]
|
|
|
- mask = line.split()[0].split("/")[1]
|
|
|
- hostname = line.split()[1]
|
|
|
- docker = Docker(IP, mask, hostname)
|
|
|
- vm.add_docker(docker)
|
|
|
- self.VM_list.append(vm)
|
|
|
+ self.state = ""
|
|
|
+ self.create_time = ""
|
|
|
+ # The list should only has one or zero VM, which holds the Ambari-server
|
|
|
+ self.ambari_server_vm = []
|
|
|
+ # The list of VMs, with Ambari-agents directly inside (not in Docker)
|
|
|
+ self.service_server_vm_list = []
|
|
|
+ # The list of VMs, each will hold multiple Docker containers with Ambari-agent inside
|
|
|
+ self.ambari_agent_vm_list = []
|
|
|
+
|
|
|
+ def _get_int_interval(self, int_list):
|
|
|
+ """
|
|
|
+ get the interval of the integer list
|
|
|
+ example: input[4,5,6,1,2,3], output [(1,3),(4,6)]
|
|
|
+ example: input[4,5,6,100,2,3], output [(2,3),(4,6),(100,100)]
|
|
|
+ :param int_list: the list of integer
|
|
|
+ :return: a tuple, each tuple has 2 integer, representing one interval
|
|
|
+ """
|
|
|
+ interval_list = []
|
|
|
+ int_list.sort()
|
|
|
+
|
|
|
+ begin = None
|
|
|
+ end = None
|
|
|
+ for integer in int_list:
|
|
|
+ if begin is None:
|
|
|
+ begin = integer
|
|
|
+ end = integer
|
|
|
+ else:
|
|
|
+ if integer == end + 1:
|
|
|
+ end = integer
|
|
|
+ else:
|
|
|
+ interval_list.append((begin, end))
|
|
|
+ begin = integer
|
|
|
+ end = integer
|
|
|
+
|
|
|
+ if begin is not None:
|
|
|
+ interval_list.append((begin, end))
|
|
|
+
|
|
|
+ return interval_list
|
|
|
+
|
|
|
+ def print_description(self):
|
|
|
+ print "cluster name: ", self.cluster_name
|
|
|
+ print "create time: ", self.create_time
|
|
|
+ print "state: ", self.state
|
|
|
+ print
|
|
|
+
|
|
|
+ print "Ambari Server: "
|
|
|
+ ambari_server_vm = self.get_ambari_server_vm()
|
|
|
+ if ambari_server_vm is None:
|
|
|
+ print "None"
|
|
|
+ else:
|
|
|
+ print ambari_server_vm.domain_name, " ", ambari_server_vm.external_ip, " ",\
|
|
|
+ ambari_server_vm.weave_internal_ip
|
|
|
+ print
|
|
|
+
|
|
|
+ print "Service Server with Ambari Agent directly installed: "
|
|
|
+ if len(self.service_server_vm_list) == 0:
|
|
|
+ print "None"
|
|
|
+ for vm in self.service_server_vm_list:
|
|
|
+ print vm.weave_domain_name, " ", vm.external_ip, " ", vm.weave_internal_ip
|
|
|
+ print
|
|
|
+
|
|
|
+ print "Ambari Agent in Docker Container: "
|
|
|
+ int_list = []
|
|
|
+ for vm in self.ambari_agent_vm_list:
|
|
|
+ for docker in vm.docker_list:
|
|
|
+ int_list.append(int(docker.get_index()))
|
|
|
+ interval_list = self._get_int_interval(int_list)
|
|
|
+ for interval in interval_list:
|
|
|
+ interval_str = ""
|
|
|
+ if interval[0] == interval[1]:
|
|
|
+ interval_str = str(interval(0))
|
|
|
+ else:
|
|
|
+ interval_str = "[{0}-{1}]".format(interval[0], interval[1])
|
|
|
+ print Docker.get_pattern_presentation(self.cluster_name, interval_str)
|
|
|
+ print
|
|
|
+
|
|
|
+ def get_agent_vm(self, vm_ip):
|
|
|
+ """
|
|
|
+ get the VM instance which holds Docker containers from the cluster instance
|
|
|
+ :param vm_ip: the external IP of the target VM
|
|
|
+ :return: the VM instance with the specified iP
|
|
|
+ """
|
|
|
+ for vm in self.ambari_agent_vm_list:
|
|
|
+ if vm.external_ip == vm_ip:
|
|
|
+ return vm
|
|
|
+
|
|
|
+ def get_ambari_server_vm(self):
|
|
|
+ """
|
|
|
+ get the VM instance which hold the Ambari-server
|
|
|
+ :return: the VM instance hold the Ambari-server, or None if no Ambari-server in this cluster
|
|
|
+ """
|
|
|
+ for vm in self.ambari_server_vm:
|
|
|
+ return vm
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_service_server_vm(self, vm_ip):
|
|
|
+ """
|
|
|
+ get the VM instance which directly hold the Ambari-agent
|
|
|
+ :param vm_ip: the external IP of the target VM
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ for vm in self.service_server_vm_list:
|
|
|
+ if vm.external_ip == vm_ip:
|
|
|
+ return vm
|
|
|
+
|
|
|
+ def to_json(self):
|
|
|
+ """
|
|
|
+ create a map to hold the information of the Cluster instance
|
|
|
+ :return: A map, which is JSON format object.
|
|
|
+ """
|
|
|
+ cluster = {}
|
|
|
+ cluster["cluster_name"] = self.cluster_name
|
|
|
+ cluster["create_time"] = self.create_time
|
|
|
+ cluster["state"] = self.state
|
|
|
+
|
|
|
+ cluster["ambari_server_vm"] = []
|
|
|
+ for vm in self.ambari_server_vm:
|
|
|
+ cluster["ambari_server_vm"].append(vm.to_json())
|
|
|
|
|
|
- file.close()
|
|
|
+ cluster["service_server_vm_list"] = []
|
|
|
+ for vm in self.service_server_vm_list:
|
|
|
+ cluster["service_server_vm_list"].append(vm.to_json())
|
|
|
|
|
|
- def __extract_VM_IP__(self, GCE_info_file_name):
|
|
|
- f = open(GCE_info_file_name)
|
|
|
- lines = f.readlines()
|
|
|
- f.close()
|
|
|
+ cluster["ambari_agent_vm_list"] = []
|
|
|
+ for vm in self.ambari_agent_vm_list:
|
|
|
+ cluster["ambari_agent_vm_list"].append(vm.to_json())
|
|
|
|
|
|
- ip_list = []
|
|
|
- for line in lines:
|
|
|
+ return cluster
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def load_from_json(cluster_name):
|
|
|
+ """
|
|
|
+ load the cluster information from json file
|
|
|
+ :param cluster_name: the name of the cluster
|
|
|
+ :return: a Cluster instance or None if no such cluster
|
|
|
+ """
|
|
|
+ data = Data()
|
|
|
+ json_data = data.read_cluster_json(cluster_name)
|
|
|
+ if json_data is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ ambari_server_vm = []
|
|
|
+ service_server_vm_list = []
|
|
|
+ ambari_agent_vm_list = []
|
|
|
+
|
|
|
+ for vm_json in json_data["ambari_server_vm"]:
|
|
|
+ ambari_server_vm.append(VM.load_from_json(vm_json))
|
|
|
+
|
|
|
+ for vm_json in json_data["service_server_vm_list"]:
|
|
|
+ service_server_vm_list.append(VM.load_from_json(vm_json))
|
|
|
+
|
|
|
+ for vm_json in json_data["ambari_agent_vm_list"]:
|
|
|
+ ambari_agent_vm_list.append(VM.load_from_json(vm_json))
|
|
|
+
|
|
|
+ cluster = Cluster()
|
|
|
+ cluster.cluster_name = cluster_name
|
|
|
+ cluster.state = json_data["state"]
|
|
|
+ cluster.create_time = json_data["create_time"]
|
|
|
+ cluster.ambari_server_vm = ambari_server_vm
|
|
|
+ cluster.service_server_vm_list = service_server_vm_list
|
|
|
+ cluster.ambari_agent_vm_list = ambari_agent_vm_list
|
|
|
+ return cluster
|
|
|
+
|
|
|
+ def _extract_vm_fqdn_ip(self, gce_info_file_name):
|
|
|
+ """
|
|
|
+ exatract domain name and IP address of VMs from the output file of GCE
|
|
|
+ :param gce_info_file_name: output file of "GCE info" command
|
|
|
+ :return: A list of tuple, each tuple has domain name and IP of a VM
|
|
|
+ """
|
|
|
+ lines = []
|
|
|
+ with open(gce_info_file_name) as f:
|
|
|
+ lines = f.readlines()
|
|
|
+
|
|
|
+ vm_list = []
|
|
|
+ # the first line in the output file is title
|
|
|
+ for line in lines[1:]:
|
|
|
tokens = line.split()
|
|
|
- ip_list.append(tokens[1])
|
|
|
- return ip_list[1:]
|
|
|
-
|
|
|
- # request a new cluster
|
|
|
- def request_GCE_cluster(self, vms_num, docker_num, cluster_name):
|
|
|
- # reload configuration file
|
|
|
- config = Config()
|
|
|
- config.load()
|
|
|
- # request cluster
|
|
|
- gce_key = config.ATTRIBUTES["GCE_controller_key_file"]
|
|
|
- gce_login = config.ATTRIBUTES["GCE_controller_user"] + "@" + config.ATTRIBUTES["GCE_controller_IP"]
|
|
|
- gce_up_cmd = "gce up " + cluster_name + " " + str(vms_num) + " " + config.ATTRIBUTES["GCE_VM_type"] + \
|
|
|
- " " + config.ATTRIBUTES["GCE_VM_OS"]
|
|
|
+ fqdn_ip = (tokens[0], tokens[1])
|
|
|
+ vm_list.append(fqdn_ip)
|
|
|
+ return vm_list
|
|
|
+
|
|
|
+ def request_vm(self, name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd):
|
|
|
+ """
|
|
|
+ Request VMs from GCE
|
|
|
+ :param name: the name prefix of all requesting VMs
|
|
|
+ :param vm_num: the number of VM
|
|
|
+ :param gce_vm_type: the type of VM
|
|
|
+ :param gce_vm_os: the OS of VM
|
|
|
+ :param gce_extra_cmd: extra command for requesting the VMs
|
|
|
+ :return: A list of tuple, each tuple has domain name and IP of a VM
|
|
|
+ """
|
|
|
+ gce_key = Config.ATTRIBUTES["gce_controller_key_file"]
|
|
|
+ gce_login = "{0}@{1}".format(Config.ATTRIBUTES["gce_controller_user"], Config.ATTRIBUTES["gce_controller_ip"])
|
|
|
+ gce_up_cmd = "gce up {0} {1} {2} {3} {4}".format(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd)
|
|
|
subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_up_cmd])
|
|
|
|
|
|
- print "cluster launched successufully, wait 5 seconds for cluster info ... ..."
|
|
|
- time.sleep(5)
|
|
|
+ Log.write("cluster launched, wait for cluster info ... ...")
|
|
|
+
|
|
|
+ fqdn_ip_pairs = []
|
|
|
+ # wait for long enough. the more VM, more time it takes.
|
|
|
+ for retry in range(max(6, vm_num)):
|
|
|
+ time.sleep(10)
|
|
|
+
|
|
|
+ # request cluster info
|
|
|
+ with open(Config.ATTRIBUTES["gce_info_output"], "w") as gce_info_output_file:
|
|
|
+ gce_info_cmd = "gce info {0}".format(name)
|
|
|
+ subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_info_cmd],
|
|
|
+ stdout=gce_info_output_file)
|
|
|
+
|
|
|
+ fqdn_ip_pairs = self._extract_vm_fqdn_ip(Config.ATTRIBUTES["gce_info_output"])
|
|
|
+
|
|
|
+ if len(fqdn_ip_pairs) == vm_num:
|
|
|
+ Log.write("Get info for all ", str(len(fqdn_ip_pairs)), " VMs successfully")
|
|
|
+ break
|
|
|
+ Log.write("Only get info for ", str(len(fqdn_ip_pairs)), " VMs, retry ... ...")
|
|
|
+ return fqdn_ip_pairs
|
|
|
+
|
|
|
+ def request_ambari_server_vm(self, name):
|
|
|
+ """
|
|
|
+ request a VM for holding Ambari-server
|
|
|
+ :param name: the name prefix of all requesting VMs
|
|
|
+ :return: A list of tuple, each tuple has domain name and IP of a VM
|
|
|
+ """
|
|
|
+ # only 1 ambari server
|
|
|
+ vm_num = 1
|
|
|
+ gce_vm_type = Config.ATTRIBUTES["ambari_server_vm_type"]
|
|
|
+ gce_vm_os = Config.ATTRIBUTES["ambari_server_vm_os"]
|
|
|
+
|
|
|
+ gce_extra_cmd = ""
|
|
|
+ if "ambari_server_vm_extra" in Config.ATTRIBUTES:
|
|
|
+ gce_extra_cmd = Config.ATTRIBUTES["ambari_server_vm_extra"]
|
|
|
+
|
|
|
+ fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd)
|
|
|
+ return fqdn_ip_pairs
|
|
|
+
|
|
|
+ def reqeust_service_server_vm(self, vm_num, name):
|
|
|
+ """
|
|
|
+ Request VMs to directly hold Ambari-agent (not inside Docker)
|
|
|
+ :param vm_num: the number of VM to request
|
|
|
+ :param name: the name prefix of all requesting VMs
|
|
|
+ :return: A list of tuple, each tuple has domain name and IP of a VM
|
|
|
+ """
|
|
|
+ gce_vm_type = Config.ATTRIBUTES["service_server_vm_type"]
|
|
|
+ gce_vm_os = Config.ATTRIBUTES["service_server_vm_os"]
|
|
|
+
|
|
|
+ gce_extra_cmd = ""
|
|
|
+ if "service_server_vm_extra" in Config.ATTRIBUTES:
|
|
|
+ gce_extra_cmd = Config.ATTRIBUTES["service_server_vm_extra"]
|
|
|
+
|
|
|
+ fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_cmd)
|
|
|
+ return fqdn_ip_pairs
|
|
|
|
|
|
- # request cluster info
|
|
|
- gce_info_output_file = open(config.ATTRIBUTES["GCE_info_output"], "w")
|
|
|
- gce_info_cmd = "gce info " + cluster_name
|
|
|
- subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-i", gce_key, gce_login, gce_info_cmd], \
|
|
|
- stdout=gce_info_output_file)
|
|
|
- gce_info_output_file.close()
|
|
|
- print "cluster info is saved to file " + config.ATTRIBUTES["GCE_info_output"]
|
|
|
+ def request_agent_vm(self, vm_num, name):
|
|
|
+ """
|
|
|
+ Request VMs to hold Docker containers, each with Ambari-agent inside
|
|
|
+ :param vm_num: the number of VM to request
|
|
|
+ :param name: the name prefix of all requesting VMs
|
|
|
+ :return: A list of tuple, each tuple has domain name and IP of a VM
|
|
|
+ """
|
|
|
+ gce_vm_type = Config.ATTRIBUTES["ambari_agent_vm_type"]
|
|
|
+ gce_vm_os = Config.ATTRIBUTES["ambari_agent_vm_os"]
|
|
|
+ gce_extra_disk = ""
|
|
|
+ if "ambari_agent_vm_extra_disk" in Config.ATTRIBUTES:
|
|
|
+ gce_extra_disk = Config.ATTRIBUTES["ambari_agent_vm_extra_disk"]
|
|
|
+
|
|
|
+ fqdn_ip_pairs = self.request_vm(name, vm_num, gce_vm_type, gce_vm_os, gce_extra_disk)
|
|
|
+ return fqdn_ip_pairs
|
|
|
+
|
|
|
+ def request_gce_cluster(self, ambari_agent_vm_num, docker_num,
|
|
|
+ service_server_num, with_ambari_server, cluster_name):
|
|
|
+ """
|
|
|
+ Request a cluster from GCE
|
|
|
+ :param ambari_agent_vm_num: number of VMs to hold Docker containers
|
|
|
+ :param docker_num: number of Docker containers inside each VM
|
|
|
+ :param service_server_num: number of VMs which has Ambari-agent directly installed (not in Docker)
|
|
|
+ :param with_ambari_server: True or False, whether to request a VM to hold Ambari-server
|
|
|
+ :param cluster_name: the name of the cluster
|
|
|
+ :return: None
|
|
|
+ """
|
|
|
+ ambari_server_fqdn_ip_pairs = []
|
|
|
+ if with_ambari_server is True:
|
|
|
+ ambari_server_fqdn_ip_pairs = self.request_ambari_server_vm(VM.get_ambari_server_vm_name(cluster_name))
|
|
|
+ service_server_fqdn_ip_pairs = self.reqeust_service_server_vm(service_server_num,
|
|
|
+ VM.get_service_server_vm_name(cluster_name))
|
|
|
+ ambari_agent_fqdn_ip_pairs = self.request_agent_vm(ambari_agent_vm_num,
|
|
|
+ VM.get_ambari_agent_vm_name(cluster_name))
|
|
|
|
|
|
# prepare all attributes of the cluster, write to a file
|
|
|
- VM_IP_list = self.__extract_VM_IP__(config.ATTRIBUTES["GCE_info_output"])
|
|
|
- self.generate_cluster_info(VM_IP_list, cluster_name, docker_num)
|
|
|
- self.overwrite_to_file(config.ATTRIBUTES["cluster_info_file"])
|
|
|
- # server need this file to resolve the host names of the agents
|
|
|
- self.export_hostnames(config.ATTRIBUTES["Docker_hostname_info"])
|
|
|
-
|
|
|
- # save info to file
|
|
|
- def overwrite_to_file(self, filename):
|
|
|
- file = open(filename, "w")
|
|
|
- file.write("cluster_name: " + self.cluster_name + "\n")
|
|
|
- file.write("VMs_num: " + str(self.VMs_num) + "\n")
|
|
|
-
|
|
|
- for vm in self.VM_list:
|
|
|
- file.write("\t\t")
|
|
|
- file.write("VM_IP: " + vm.external_ip + "\n")
|
|
|
- file.write("\t\t")
|
|
|
- file.write("Docker_num: " + str(len(vm.docker_list)) + "\n")
|
|
|
- for docker in vm.docker_list:
|
|
|
- file.write("\t\t\t\t")
|
|
|
- file.write(docker.IP + "/" + docker.mask + " " + docker.hostname + "\n")
|
|
|
+ self.generate_cluster_info(cluster_name, ambari_server_fqdn_ip_pairs, service_server_fqdn_ip_pairs,
|
|
|
+ ambari_agent_fqdn_ip_pairs, docker_num)
|
|
|
|
|
|
- file.close()
|
|
|
+ def generate_cluster_info(self, cluster_name, ambari_server_fqdn_ip_pairs, service_server_fqdn_ip_pairs,
|
|
|
+ ambari_agent_fqdn_ip_pairs, docker_num):
|
|
|
+ """
|
|
|
+ generate VM and docker info for this cluster
|
|
|
+ set up parameter of the class instance as this info
|
|
|
+ :param cluster_name: the name of the cluster
|
|
|
+ :param ambari_server_fqdn_ip_pairs: the domain name and IP pairs for Ambari-server
|
|
|
+ :param service_server_fqdn_ip_pairs: the domain name and IP pairs for VMs with Ambari-agent installed
|
|
|
+ :param ambari_agent_fqdn_ip_pairs: the domain name and IP pairs for VM with Docker containers
|
|
|
+ :param docker_num: the number of Dockers inside each VMs
|
|
|
+ :return: None
|
|
|
+ """
|
|
|
+ weave_ip_base = Config.ATTRIBUTES["weave_ip_base"]
|
|
|
+ weave_ip_mask = Config.ATTRIBUTES["weave_ip_mask"]
|
|
|
+ current_ip = weave_ip_base
|
|
|
|
|
|
- def __increase_IP__(self, base_IP, increase):
|
|
|
- IP = [int(base_IP[0]), int(base_IP[1]), int(base_IP[2]), int(base_IP[3])]
|
|
|
- IP[3] = IP[3] + increase
|
|
|
- for index in reversed(range(0, 4)):
|
|
|
- if IP[index] > 255:
|
|
|
- IP[index - 1] = IP[index - 1] + IP[index] / 256
|
|
|
- IP[index] = IP[index] % 256
|
|
|
- return IP
|
|
|
-
|
|
|
- # generate VM and docker info for this cluster
|
|
|
- # set up parameter as this info
|
|
|
- def generate_cluster_info(self, VM_IP_list, cluster_name, docker_num):
|
|
|
- config = Config()
|
|
|
- config.load()
|
|
|
- Docker_IP_base = config.ATTRIBUTES["Docker_IP_base"].split(".")
|
|
|
- Docker_IP_mask = config.ATTRIBUTES["Docker_IP_mask"]
|
|
|
-
|
|
|
- VM_index = 0
|
|
|
- for VM_IP in VM_IP_list:
|
|
|
- vm = VM(VM_IP)
|
|
|
-
|
|
|
- for Docker_index in range(0, docker_num):
|
|
|
- total_Docker_index = VM_index * docker_num + Docker_index
|
|
|
- docker_IP = self.__increase_IP__(Docker_IP_base, total_Docker_index)
|
|
|
- docker_IP_str = str(docker_IP[0]) + "." + str(docker_IP[1]) + "." + \
|
|
|
- str(docker_IP[2]) + "." + str(docker_IP[3])
|
|
|
- docker_hostname = cluster_name + "-" + str(VM_index) + "-" + str(Docker_index)
|
|
|
- docker = Docker(docker_IP_str, str(Docker_IP_mask), docker_hostname)
|
|
|
- # print docker
|
|
|
+ for vm_domain_name, vm_ip in ambari_server_fqdn_ip_pairs:
|
|
|
+ current_ip = self._increase_ip(current_ip, 1)
|
|
|
+ weave_dns_ip = current_ip
|
|
|
+ vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask)
|
|
|
+ current_ip = self._increase_ip(current_ip, 1)
|
|
|
+ vm.weave_internal_ip = current_ip
|
|
|
+ self.ambari_server_vm.append(vm)
|
|
|
+
|
|
|
+ for vm_domain_name, vm_ip in service_server_fqdn_ip_pairs:
|
|
|
+ current_ip = self._increase_ip(current_ip, 1)
|
|
|
+ weave_dns_ip = current_ip
|
|
|
+ vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask)
|
|
|
+ current_ip = self._increase_ip(current_ip, 1)
|
|
|
+ vm.weave_internal_ip = current_ip
|
|
|
+ self.service_server_vm_list.append(vm)
|
|
|
+
|
|
|
+ vm_index = 0
|
|
|
+ for vm_domain_name, vm_ip in ambari_agent_fqdn_ip_pairs:
|
|
|
+ current_ip = self._increase_ip(current_ip, 1)
|
|
|
+ weave_dns_ip = current_ip
|
|
|
+ vm = VM(vm_ip, vm_domain_name, weave_dns_ip, weave_ip_mask)
|
|
|
+
|
|
|
+ for docker_index in range(0, docker_num):
|
|
|
+ current_ip = self._increase_ip(current_ip, 1)
|
|
|
+ docker_ip_str = current_ip
|
|
|
+
|
|
|
+ total_docker_index = vm_index * docker_num + docker_index
|
|
|
+ docker_domain_name = Docker.get_weave_domain_name(cluster_name, total_docker_index)
|
|
|
+
|
|
|
+ docker = Docker(docker_ip_str, str(weave_ip_mask), docker_domain_name)
|
|
|
vm.add_docker(docker)
|
|
|
- VM_index = VM_index + 1
|
|
|
- self.VM_list.append(vm)
|
|
|
|
|
|
- self.VMs_num = len(VM_IP_list)
|
|
|
+ vm_index += 1
|
|
|
+ self.ambari_agent_vm_list.append(vm)
|
|
|
+
|
|
|
self.cluster_name = cluster_name
|
|
|
+ self.create_time = str(datetime.datetime.now())
|
|
|
+ self.state = Cluster.STATE_FREE
|
|
|
|
|
|
- # run all dockers for all the VMs in the cluster
|
|
|
- # upload necessary file to each machine in cluster, run launcher_docker.py in each machine with parameter
|
|
|
- def run_docker_on_cluster(self, server_external_IP, server_Weave_IP):
|
|
|
- config = Config()
|
|
|
- config.load()
|
|
|
-
|
|
|
- for vm in self.VM_list:
|
|
|
- # upload necessary file to each machine in cluster
|
|
|
- VM_external_IP = vm.external_ip
|
|
|
- VM_directory = "root@" + VM_external_IP + ":" + config.ATTRIBUTES["VM_code_directory"]
|
|
|
- VM_key = config.ATTRIBUTES["GCE_VM_key_file"]
|
|
|
- subprocess.call(["scp", "-o", "StrictHostKeyChecking=no", "-i", VM_key, "-r", ".", VM_directory])
|
|
|
-
|
|
|
- # run launcher_docker.py in each machine with parameters
|
|
|
- subprocess.call(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", VM_key, \
|
|
|
- "root@" + VM_external_IP, \
|
|
|
- "cd " + config.ATTRIBUTES["VM_code_directory"] + "; python launcher_docker.py" + \
|
|
|
- " " + VM_external_IP + " " + server_Weave_IP + " " + server_external_IP])
|
|
|
-
|
|
|
- # export host names to a file
|
|
|
- def export_hostnames(self, filename):
|
|
|
- hostname_file = open(filename, "w")
|
|
|
- for vm in self.VM_list:
|
|
|
- for docker in vm.docker_list:
|
|
|
- hostname_file.write(docker.IP)
|
|
|
- hostname_file.write(" ")
|
|
|
- hostname_file.write(docker.hostname)
|
|
|
- hostname_file.write("\n")
|
|
|
- hostname_file.close()
|
|
|
+ # update config file.
|
|
|
+ # This step makes the user avoid reconfiguring the IP for next cluster creation
|
|
|
+ Config.update("weave", "weave_ip_base", current_ip)
|
|
|
+
|
|
|
+ def _increase_ip(self, base_ip_str, increase):
|
|
|
+ """
|
|
|
+ increase the IP address.
|
|
|
+ example: 192.168.1.1, increased by 1: 192.168.1.2
|
|
|
+ example: 192.168.1.254, increased by 2: 192.168.2.1
|
|
|
+ :param base_ip_str: the IP to be increased
|
|
|
+ :param increase: the amount of increase
|
|
|
+ :return: the new IP address, in String
|
|
|
+ """
|
|
|
+ base_ip = base_ip_str.split(".")
|
|
|
+ new_ip = [int(base_ip[0]), int(base_ip[1]), int(base_ip[2]), int(base_ip[3])]
|
|
|
+ new_ip[3] = new_ip[3] + increase
|
|
|
+ for index in reversed(range(0, 4)):
|
|
|
+ if new_ip[index] > 255:
|
|
|
+ new_ip[index - 1] += (new_ip[index] / 256)
|
|
|
+ new_ip[index] %= 256
|
|
|
+ return "{0}.{1}.{2}.{3}".format(new_ip[0], new_ip[1], new_ip[2], new_ip[3])
|
|
|
+
|
|
|
+ def _scp_upload(self, vm_external_ip):
|
|
|
+ """
|
|
|
+ upload all the code in a VM
|
|
|
+ :param vm_external_ip: the external IP of the VM
|
|
|
+ :return: None
|
|
|
+ """
|
|
|
+ # upload necessary file to VM
|
|
|
+ vm_directory = "{0}@{1}:{2}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip,
|
|
|
+ Config.ATTRIBUTES["vm_code_directory"])
|
|
|
+ vm_key = Config.ATTRIBUTES["vm_key_file"]
|
|
|
+
|
|
|
+ upload_return_code = 0
|
|
|
+ with open(os.devnull, 'w') as shutup:
|
|
|
+ upload_return_code = subprocess.call(["scp", "-o", "StrictHostKeyChecking=no", "-i",
|
|
|
+ vm_key, "-r", ".", vm_directory],
|
|
|
+ stdout=shutup, stderr=shutup)
|
|
|
+ if upload_return_code == 0:
|
|
|
+ Log.write("VM ", vm_external_ip, " file upload succeed")
|
|
|
+ else:
|
|
|
+ Log.write("VM ", vm_external_ip, " file upload fail")
|
|
|
+
|
|
|
+ def run_cluster(self, server_weave_ip, server_external_ip):
|
|
|
+ """
|
|
|
+ Run all Ambari-agents and Ambari-server in the cluster in parallel
|
|
|
+ Wait until all processes finish
|
|
|
+ :param server_weave_ip: the Weave IP of Ambari-server
|
|
|
+ :param server_external_ip: the external IP of Ambari-server
|
|
|
+ :return: None
|
|
|
+ """
|
|
|
+ process_list = {}
|
|
|
+ process_list.update(self.run_ambari_server_asyn())
|
|
|
+ process_list.update(self.run_service_server_asyn(server_weave_ip, server_external_ip))
|
|
|
+ process_list.update(self.run_docker_on_cluster_asyn(server_weave_ip, server_external_ip))
|
|
|
+
|
|
|
+ terminate_state_list = {}
|
|
|
+ for hostname in process_list:
|
|
|
+ terminate_state_list[hostname] = False
|
|
|
+
|
|
|
+ Log.write("Wait for all VMs to finish configuration ... ...")
|
|
|
+
|
|
|
+ # Wait for all configuration subprocesses
|
|
|
+ while True:
|
|
|
+ all_finished = True
|
|
|
+ for hostname in process_list:
|
|
|
+ output_file, output_file_path, process = process_list[hostname]
|
|
|
+ if terminate_state_list[hostname] is False:
|
|
|
+ all_finished = False
|
|
|
+ returncode = process.poll()
|
|
|
+ if returncode is None:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ Log.write("VM ", hostname, " configuration completed, return code: ", str(returncode) \
|
|
|
+ , ", output file path: ", output_file_path)
|
|
|
+ terminate_state_list[hostname] = True
|
|
|
+ output_file.close()
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ if all_finished:
|
|
|
+ break
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ Log.write("All VM configuration completed.")
|
|
|
+
|
|
|
+ def run_ambari_server_asyn(self):
|
|
|
+ """
|
|
|
+ Run configuration for Ambari-server in this cluster
|
|
|
+ Set up Ambari-server and Weave network
|
|
|
+ The method is NON-BLOCK
|
|
|
+ :return: a map of tuple, the key of the map is the host name of the VM,
|
|
|
+ the tuple has 3 elements: the file handler of the output of the VM,
|
|
|
+ the file path of the output of the VM,
|
|
|
+ and the process object of configuration for the VM
|
|
|
+ """
|
|
|
+ process_list = {}
|
|
|
+
|
|
|
+ for vm in self.ambari_server_vm:
|
|
|
+ vm_external_ip = vm.external_ip
|
|
|
+ self._scp_upload(vm_external_ip)
|
|
|
+
|
|
|
+ vm_output_file_path = vm.get_ssh_output_file_path()
|
|
|
+ vm_output_file = open(vm_output_file_path, "w")
|
|
|
+
|
|
|
+ # ssh install server
|
|
|
+ vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
|
|
|
+ vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
|
|
|
+ vm_ssh_python_cmd = "python launcher_ambari_server.py {0}".format(self.cluster_name)
|
|
|
+ vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd)
|
|
|
+ vm_key = Config.ATTRIBUTES["vm_key_file"]
|
|
|
+ Log.write(vm_ssh_python_cmd)
|
|
|
+
|
|
|
+ process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
|
|
|
+ vm_ssh_login, vm_ssh_cmd],
|
|
|
+ stdout=vm_output_file, stderr=vm_output_file)
|
|
|
+ process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process)
|
|
|
+ Log.write("Configuring VM ", vm.hostname, " ... ...")
|
|
|
+ return process_list
|
|
|
+
|
|
|
+ def run_service_server_asyn(self, server_weave_ip, server_external_ip):
|
|
|
+ """
|
|
|
+ Run configuration, set up Ambari-agent in this VM, and the Weave network
|
|
|
+ :param server_weave_ip: the Weave IP of the Ambari-server
|
|
|
+ :param server_external_ip: the external IP of the Ambari-server
|
|
|
+ The method is NON-BLOCK
|
|
|
+ :return: a map of tuple, the key of the map is the host name of the VM,
|
|
|
+ the tuple has 3 elements: the file handler of the output of the VM,
|
|
|
+ the file path of the output of the VM,
|
|
|
+ and the process object of configuration for the VM
|
|
|
+ """
|
|
|
+ process_list = {}
|
|
|
+
|
|
|
+ for vm in self.service_server_vm_list:
|
|
|
+ vm_external_ip = vm.external_ip
|
|
|
+ self._scp_upload(vm_external_ip)
|
|
|
+
|
|
|
+ vm_output_file_path = vm.get_ssh_output_file_path()
|
|
|
+ vm_output_file = open(vm_output_file_path, "w")
|
|
|
+
|
|
|
+ # ssh install server
|
|
|
+ vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
|
|
|
+ vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
|
|
|
+ vm_ssh_python_cmd = "python launcher_service_server.py {0} {1} {2} {3}".format(
|
|
|
+ vm_external_ip, server_weave_ip, server_external_ip, self.cluster_name)
|
|
|
+ vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd)
|
|
|
+ vm_key = Config.ATTRIBUTES["vm_key_file"]
|
|
|
+ Log.write(vm_ssh_python_cmd)
|
|
|
+
|
|
|
+ process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
|
|
|
+ vm_ssh_login, vm_ssh_cmd],
|
|
|
+ stdout=vm_output_file, stderr=vm_output_file)
|
|
|
+
|
|
|
+ process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process)
|
|
|
+ Log.write("Configuring VM ", vm.hostname, " ... ...")
|
|
|
+ return process_list
|
|
|
+
|
|
|
+ def run_docker_on_cluster_asyn(self, server_weave_ip, server_external_ip):
|
|
|
+ """
|
|
|
+ Run configuration, set up Docker and Weave network
|
|
|
+ run all containers, each with Ambari-agent inside
|
|
|
+ :param server_weave_ip: the Weave IP of the Ambari-server
|
|
|
+ :param server_external_ip: the external IP of the Ambari-server
|
|
|
+ The method is NON-BLOCK
|
|
|
+ :return: a map of tuple, the key of the map is the host name of the VM,
|
|
|
+ the tuple has 3 elements: the file handler of the output of the VM,
|
|
|
+ the file path of the output of the VM,
|
|
|
+ and the process object of configuration for the VM
|
|
|
+ """
|
|
|
+ process_list = {}
|
|
|
+
|
|
|
+ for vm in self.ambari_agent_vm_list:
|
|
|
+ vm_external_ip = vm.external_ip
|
|
|
+ self._scp_upload(vm_external_ip)
|
|
|
+
|
|
|
+ vm_output_file_path = vm.get_ssh_output_file_path()
|
|
|
+ vm_output_file = open(vm_output_file_path, "w")
|
|
|
+
|
|
|
+ vm_ssh_login = "{0}@{1}".format(Config.ATTRIBUTES["vm_user"], vm_external_ip)
|
|
|
+ vm_ssh_cd_cmd = "cd {0}".format(Config.ATTRIBUTES["vm_code_directory"])
|
|
|
+ vm_ssh_python_cmd = "python launcher_docker.py {0} {1} {2} {3}".format(
|
|
|
+ vm_external_ip, server_weave_ip, server_external_ip, self.cluster_name)
|
|
|
+ vm_ssh_cmd = "{0};{1}".format(vm_ssh_cd_cmd, vm_ssh_python_cmd)
|
|
|
+ vm_key = Config.ATTRIBUTES["vm_key_file"]
|
|
|
+ Log.write(vm_ssh_python_cmd)
|
|
|
+
|
|
|
+ process = subprocess.Popen(["ssh", "-o", "StrictHostKeyChecking=no", "-t", "-i", vm_key,
|
|
|
+ vm_ssh_login, vm_ssh_cmd],
|
|
|
+ stdout=vm_output_file, stderr=vm_output_file)
|
|
|
+
|
|
|
+ process_list[vm.hostname] = (vm_output_file, vm_output_file_path, process)
|
|
|
+ Log.write("Configuring VM ", vm.hostname, " ... ...")
|
|
|
+
|
|
|
+ return process_list
|