123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import optparse
- import sys
- import os
- import logging
- import tempfile
- import urllib2
- import socket
- import json
- import base64
- import time
- AMBARI_HOSTNAME = None
- AMBARI_PORT = 8080
- CLUSTER_NAME = None
- PROTOCOL = "http"
- USERNAME = "admin"
- PASSWORD = "admin"
- DEFAULT_TIMEOUT = 10 # seconds
- START_ON_RELOCATE = False
- # Supported Actions
- RELOCATE_ACTION = 'relocate'
- ALLOWED_ACTUAL_STATES_FOR_RELOCATE = [ 'INIT', 'UNKNOWN', 'MAINTENANCE', 'UNINSTALLED' ]
- ALLOWED_HOST_STATUS_FOR_RELOCATE = [ 'HEALTHY' ]
- STATUS_WAIT_TIMEOUT = 120 # seconds
- STATUS_CHECK_INTERVAL = 10 # seconds
- # API calls
- GET_CLUSTERS_URI = "/api/v1/clusters/"
- GET_HOST_COMPONENTS_URI = "/api/v1/clusters/{0}/services/{1}/components/{2}" +\
- "?fields=host_components"
- GET_HOST_COMPONENT_DESIRED_STATE_URI = "/api/v1/clusters/{0}/hosts/{1}" +\
- "/host_components/{2}" +\
- "?fields=HostRoles/desired_state"
- GET_HOST_COMPONENT_STATE_URI = "/api/v1/clusters/{0}/hosts/{1}" +\
- "/host_components/{2}" +\
- "?fields=HostRoles/state"
- GET_HOST_STATE_URL = "/api/v1/clusters/{0}/hosts/{1}?fields=Hosts/host_state"
- HOST_COMPONENT_URI = "/api/v1/clusters/{0}/hosts/{1}/host_components/{2}"
- ADD_HOST_COMPONENT_URI = "/api/v1/clusters/{0}/hosts?Hosts/host_name={1}"
- logger = logging.getLogger()
- class PreemptiveBasicAuthHandler(urllib2.BaseHandler):
- def __init__(self):
- password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
- password_mgr.add_password(None, getUrl(''), USERNAME, PASSWORD)
- self.passwd = password_mgr
- self.add_password = self.passwd.add_password
- def http_request(self, req):
- uri = req.get_full_url()
- user = USERNAME
- pw = PASSWORD
- raw = "%s:%s" % (user, pw)
- auth = 'Basic %s' % base64.b64encode(raw).strip()
- req.add_unredirected_header('Authorization', auth)
- return req
- class AmbariResource:
- def __init__(self, serviceName, componentName):
- self.serviveName = serviceName
- self.componentName = componentName
- self.isInitialized = False
- def initializeResource(self):
- global CLUSTER_NAME
- if CLUSTER_NAME is None:
- CLUSTER_NAME = self.findClusterName()
- if self.serviveName is None:
- raise Exception('Service name undefined')
- if self.componentName is None:
- raise Exception('Component name undefined')
- handler = PreemptiveBasicAuthHandler()
- opener = urllib2.build_opener(handler)
- # Install opener for all requests
- urllib2.install_opener(opener)
- self.urlOpener = opener
- self.old_hostname = self.getHostname()
- self.isInitialized = True
- def relocate(self, new_hostname):
- if not self.isInitialized:
- raise Exception('Resource not initialized')
- # If old and new hostname are the same exit harmlessly
- if self.old_hostname == new_hostname:
- logger.error('New hostname is same as existing host name, %s' % self.old_hostname)
- sys.exit(2)
- pass
- try:
- self.verifyHostComponentStatus(self.old_hostname, new_hostname, self.componentName)
- except Exception, e:
- logger.error("Exception caught on verify relocate request.")
- logger.error(e.message)
- sys.exit(3)
- # Put host component in Maintenance state
- self.updateHostComponentStatus(self.old_hostname, self.componentName,
- "Maintenance", "MAINTENANCE")
- # Delete current host component
- self.deleteHostComponent(self.old_hostname, self.componentName)
- # Add component on the new host
- self.addHostComponent(new_hostname, self.componentName)
- # Install host component
- self.updateHostComponentStatus(new_hostname, self.componentName,
- "Installing", "INSTALLED")
- # Wait on install
- self.waitOnHostComponentUpdate(new_hostname, self.componentName,
- "INSTALLED")
- if START_ON_RELOCATE:
- # Start host component
- self.updateHostComponentStatus(new_hostname, self.componentName,
- "Starting", "STARTED")
- # Wait on start
- self.waitOnHostComponentUpdate(new_hostname, self.componentName, "STARTED")
- pass
- pass
- def waitOnHostComponentUpdate(self, hostname, componentName, status):
- logger.info("Waiting for host component status to update ...")
- sleep_itr = 0
- state = None
- while sleep_itr < STATUS_WAIT_TIMEOUT:
- try:
- state = self.getHostComponentState(hostname, componentName)
- if status == state:
- logger.info("Status update successful. status: %s" % state)
- return
- pass
- except Exception, e:
- logger.error("Caught an exception waiting for status update.. "
- "continuing to wait...")
- pass
- time.sleep(STATUS_CHECK_INTERVAL)
- sleep_itr += STATUS_CHECK_INTERVAL
- pass
- if state and state != status:
- logger.error("Timed out on wait, status unchanged. status = %s" % state)
- sys.exit(1)
- pass
- pass
- def addHostComponent(self, hostname, componentName):
- data = '{"host_components":[{"HostRoles":{"component_name":"%s"}}]}' % self.componentName
- req = urllib2.Request(getUrl(ADD_HOST_COMPONENT_URI.format(CLUSTER_NAME,
- hostname)), data)
- req.add_header("X-Requested-By", "ambari_probe")
- req.get_method = lambda: 'POST'
- try:
- logger.info("Adding host component: %s" % req.get_full_url())
- resp = self.urlOpener.open(req)
- self.logResponse('Add host component response: ', resp)
- except Exception, e:
- logger.error('Create host component failed, component: {0}, host: {1}'
- .format(componentName, hostname))
- logger.error(e)
- raise e
- pass
- def deleteHostComponent(self, hostname, componentName):
- req = urllib2.Request(getUrl(HOST_COMPONENT_URI.format(CLUSTER_NAME,
- hostname, componentName)))
- req.add_header("X-Requested-By", "ambari_probe")
- req.get_method = lambda: 'DELETE'
- try:
- logger.info("Deleting host component: %s" % req.get_full_url())
- resp = self.urlOpener.open(req)
- self.logResponse('Delete component response: ', resp)
- except Exception, e:
- logger.error('Delete {0} failed.'.format(componentName))
- logger.error(e)
- raise e
- pass
- def updateHostComponentStatus(self, hostname, componentName, contextStr, status):
- # Update host component
- data = '{"RequestInfo":{"context":"%s %s"},"Body":{"HostRoles":{"state":"%s"}}}' % (contextStr, self.componentName, status)
- req = urllib2.Request(getUrl(HOST_COMPONENT_URI.format(CLUSTER_NAME,
- hostname, componentName)), data)
- req.add_header("X-Requested-By", "ambari_probe")
- req.get_method = lambda: 'PUT'
- try:
- logger.info("%s host component: %s" % (contextStr, req.get_full_url()))
- resp = self.urlOpener.open(req)
- self.logResponse('Update host component response: ', resp)
- except Exception, e:
- logger.error('Update Status {0} failed.'.format(componentName))
- logger.error(e)
- raise e
- pass
- def verifyHostComponentStatus(self, old_hostname, new_hostname, componentName):
- # Check desired state of host component is not STOPPED or host is
- # unreachable
- actualState = self.getHostComponentState(old_hostname, componentName)
- if actualState not in ALLOWED_ACTUAL_STATES_FOR_RELOCATE:
- raise Exception('Aborting relocate action since host component '
- 'state is %s' % actualState)
- hostState = self.getHostSatus(new_hostname)
- if hostState not in ALLOWED_HOST_STATUS_FOR_RELOCATE:
- raise Exception('Aborting relocate action since host state is %s' % hostState)
- pass
- def getHostSatus(self, hostname):
- hostStateUrl = getUrl(GET_HOST_STATE_URL.format(CLUSTER_NAME, hostname))
- logger.info("Requesting host status: %s " % hostStateUrl)
- urlResponse = self.urlOpener.open(hostStateUrl)
- state = None
- if urlResponse:
- response = urlResponse.read()
- data = json.loads(response)
- logger.debug('Response from getHostSatus: %s' % data)
- if data:
- try:
- hostsInfo = data.get('Hosts')
- if not hostsInfo:
- raise Exception('Cannot find host state for host: {1}'.format(hostname))
- state = hostsInfo.get('host_state')
- except Exception, e:
- logger.error('Unable to parse json data. %s' % data)
- raise e
- pass
- else:
- logger.error("Unable to retrieve host state.")
- pass
- return state
- def getHostComponentState(self, hostname, componentName):
- hostStatusUrl = getUrl(GET_HOST_COMPONENT_STATE_URI.format(CLUSTER_NAME,
- hostname, componentName))
- logger.info("Requesting host component state: %s " % hostStatusUrl)
- urlResponse = self.urlOpener.open(hostStatusUrl)
- state = None
- if urlResponse:
- response = urlResponse.read()
- data = json.loads(response)
- logger.debug('Response from getHostComponentState: %s' % data)
- if data:
- try:
- hostRoles = data.get('HostRoles')
- if not hostRoles:
- raise Exception('Cannot find host component state for component: ' +\
- '{0}, host: {1}'.format(componentName, hostname))
- state = hostRoles.get('state')
- except Exception, e:
- logger.error('Unable to parse json data. %s' % data)
- raise e
- pass
- else:
- logger.error("Unable to retrieve host component desired state.")
- pass
- return state
- # Log response for PUT, POST or DELETE
- def logResponse(self, text=None, response=None):
- if response is not None:
- resp = str(response.getcode())
- if text is None:
- text = 'Logging response from server: '
- if resp is not None:
- logger.info(text + resp)
- def findClusterName(self):
- clusterUrl = getUrl(GET_CLUSTERS_URI)
- clusterName = None
- logger.info("Requesting clusters: " + clusterUrl)
- urlResponse = self.urlOpener.open(clusterUrl)
- if urlResponse is not None:
- response = urlResponse.read()
- data = json.loads(response)
- logger.debug('Response from findClusterName: %s' % data)
- if data:
- try:
- clusters = data.get('items')
- if len(clusters) > 1:
- raise Exception('Multiple clusters found. %s' % clusters)
- clusterName = clusters[0].get('Clusters').get('cluster_name')
- except Exception, e:
- logger.error('Unable to parse json data. %s' % data)
- raise e
- pass
- else:
- logger.error("Unable to retrieve clusters data.")
- pass
- return clusterName
- def getHostname(self):
- hostsUrl = getUrl(GET_HOST_COMPONENTS_URI.format(CLUSTER_NAME,
- self.serviveName, self.componentName))
- logger.info("Requesting host info: " + hostsUrl)
- urlResponse = self.urlOpener.open(hostsUrl)
- hostname = None
- if urlResponse is not None:
- response = urlResponse.read()
- data = json.loads(response)
- logger.debug('Response from getHostname: %s' % data)
- if data:
- try:
- hostRoles = data.get('host_components')
- if not hostRoles:
- raise Exception('Cannot find host component data for service: ' +\
- '{0}, component: {1}'.format(self.serviveName, self.componentName))
- if len(hostRoles) > 1:
- raise Exception('More than one hosts found with the same role')
- hostname = hostRoles[0].get('HostRoles').get('host_name')
- except Exception, e:
- logger.error('Unable to parse json data. %s' % data)
- raise e
- pass
- else:
- logger.error("Unable to retrieve host component data.")
- pass
- return hostname
- def getUrl(partial_url):
- return PROTOCOL + "://" + AMBARI_HOSTNAME + ":" + AMBARI_PORT + partial_url
- def get_supported_actions():
- return [ RELOCATE_ACTION ]
- #
- # Main.
- #
- def main():
- tempDir = tempfile.gettempdir()
- outputFile = os.path.join(tempDir, "ambari_reinstall_probe.out")
- parser = optparse.OptionParser(usage="usage: %prog [options]")
- parser.set_description('This python program is a Ambari thin client and '
- 'supports relocation of ambari host components on '
- 'Ambari managed clusters.')
- parser.add_option("-v", "--verbose", dest="verbose", action="store_false",
- default=False, help="output verbosity.")
- parser.add_option("-s", "--host", dest="server_hostname",
- help="Ambari server host name.")
- parser.add_option("-p", "--port", dest="server_port",
- default="8080" ,help="Ambari server port. [default: 8080]")
- parser.add_option("-r", "--protocol", dest="protocol", default = "http",
- help="Protocol for communicating with Ambari server ("
- "http/https) [default: http].")
- parser.add_option("-c", "--cluster-name", dest="cluster_name",
- help="Ambari cluster to operate on.")
- parser.add_option("-e", "--service-name", dest="service_name",
- help="Ambari Service to which the component belongs to.")
- parser.add_option("-m", "--component-name", dest="component_name",
- help="Ambari Service Component to operate on.")
- parser.add_option("-n", "--new-host", dest="new_hostname",
- help="New host to relocate the component to.")
- parser.add_option("-a", "--action", dest="action", default = "relocate",
- help="Script action. [default: relocate]")
- parser.add_option("-o", "--output-file", dest="outputfile",
- default = outputFile, metavar="FILE",
- help="Output file. [default: %s]" % outputFile)
- parser.add_option("-u", "--username", dest="username",
- default="admin" ,help="Ambari server admin user. [default: admin]")
- parser.add_option("-w", "--password", dest="password",
- default="admin" ,help="Ambari server admin password.")
- parser.add_option("-d", "--start-component", dest="start_component",
- action="store_false", default=False,
- help="Should the script start the component after relocate.")
- (options, args) = parser.parse_args()
- # set verbose
- if options.verbose:
- logging.basicConfig(level=logging.DEBUG)
- else:
- logging.basicConfig(level=logging.INFO)
- global AMBARI_HOSTNAME
- AMBARI_HOSTNAME = options.server_hostname
- global AMBARI_PORT
- AMBARI_PORT = options.server_port
- global CLUSTER_NAME
- CLUSTER_NAME = options.cluster_name
- global PROTOCOL
- PROTOCOL = options.protocol
- global USERNAME
- USERNAME = options.username
- global PASSWORD
- PASSWORD = options.password
- global START_ON_RELOCATE
- START_ON_RELOCATE = options.start_component
- global logger
- logger = logging.getLogger('AmbariProbe')
- handler = logging.FileHandler(options.outputfile)
- formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- action = RELOCATE_ACTION
- if options.action is not None:
- if options.action not in get_supported_actions():
- logger.error("Unsupported action: " + options.action + ", "
- "valid actions: " + str(get_supported_actions()))
- sys.exit(1)
- else:
- action = options.action
- socket.setdefaulttimeout(DEFAULT_TIMEOUT)
- ambariResource = AmbariResource(serviceName=options.service_name,
- componentName=options.component_name)
- ambariResource.initializeResource()
- if action == RELOCATE_ACTION:
- if options.new_hostname is not None:
- ambariResource.relocate(options.new_hostname)
- if __name__ == "__main__":
- try:
- main()
- except (KeyboardInterrupt, EOFError):
- print("\nAborting ... Keyboard Interrupt.")
- sys.exit(1)
|