|
@@ -0,0 +1,462 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+
|
|
|
+'''
|
|
|
+Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+or more contributor license agreements. See the NOTICE file
|
|
|
+distributed with this work for additional information
|
|
|
+regarding copyright ownership. The ASF licenses this file
|
|
|
+to you under the Apache License, Version 2.0 (the
|
|
|
+"License"); you may not use this file except in compliance
|
|
|
+with the License. You may obtain a copy of the License at
|
|
|
+
|
|
|
+ http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+
|
|
|
+Unless required by applicable law or agreed to in writing, software
|
|
|
+distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+See the License for the specific language governing permissions and
|
|
|
+limitations under the License.
|
|
|
+'''
|
|
|
+
|
|
|
+import optparse
|
|
|
+import sys
|
|
|
+import os
|
|
|
+import logging
|
|
|
+import tempfile
|
|
|
+import urllib2
|
|
|
+import socket
|
|
|
+import json
|
|
|
+import base64
|
|
|
+import time
|
|
|
+import StringIO
|
|
|
+
|
|
|
+AMBARI_HOSTNAME = None
|
|
|
+AMBARI_PORT = 8080
|
|
|
+CLUSTER_NAME = None
|
|
|
+PROTOCOL = "http"
|
|
|
+USERNAME = "admin"
|
|
|
+PASSWORD = "admin"
|
|
|
+DEFAULT_TIMEOUT = 10 # seconds
|
|
|
+
|
|
|
+# Supported Actions
|
|
|
+RELOCATE_ACTION = 'relocate'
|
|
|
+ALLOWED_ACTUAL_STATES_FOR_RELOCATE = [ 'INIT', 'UNKNOWN', 'MAINTENANCE', 'UNINSTALLED' ]
|
|
|
+ALLOWED_HOST_STATUS_FOR_RELOCATE = [ 'HEALTHY' ]
|
|
|
+STATUS_WAIT_TIMEOUT = 120 # seconds
|
|
|
+STATUS_CHECK_INTERVAL = 10 # seconds
|
|
|
+
|
|
|
+# API calls
|
|
|
+GET_CLUSTERS_URI = "/api/v1/clusters/"
|
|
|
+GET_HOST_COMPONENTS_URI = "/api/v1/clusters/{0}/services/{1}/components/{2}" +\
|
|
|
+ "?fields=host_components"
|
|
|
+GET_HOST_COMPONENT_DESIRED_STATE_URI = "/api/v1/clusters/{0}/hosts/{1}" +\
|
|
|
+ "/host_components/{2}" +\
|
|
|
+ "?fields=HostRoles/desired_state"
|
|
|
+GET_HOST_COMPONENT_STATE_URI = "/api/v1/clusters/{0}/hosts/{1}" +\
|
|
|
+ "/host_components/{2}" +\
|
|
|
+ "?fields=HostRoles/state"
|
|
|
+GET_HOST_STATE_URL = "/api/v1/clusters/{0}/hosts/{1}?fields=Hosts/host_state"
|
|
|
+HOST_COMPONENT_URI = "/api/v1/clusters/{0}/hosts/{1}/host_components/{2}"
|
|
|
+ADD_HOST_COMPONENT_URI = "/api/v1/clusters/{0}/hosts?Hosts/host_name={1}"
|
|
|
+
|
|
|
+logger = logging.getLogger()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class PreemptiveBasicAuthHandler(urllib2.BaseHandler):
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
|
|
|
+ password_mgr.add_password(None, getUrl(''), USERNAME, PASSWORD)
|
|
|
+ self.passwd = password_mgr
|
|
|
+ self.add_password = self.passwd.add_password
|
|
|
+
|
|
|
+ def http_request(self, req):
|
|
|
+ uri = req.get_full_url()
|
|
|
+ user = USERNAME
|
|
|
+ pw = PASSWORD
|
|
|
+ raw = "%s:%s" % (user, pw)
|
|
|
+ auth = 'Basic %s' % base64.b64encode(raw).strip()
|
|
|
+ req.add_unredirected_header('Authorization', auth)
|
|
|
+ return req
|
|
|
+
|
|
|
+
|
|
|
+class AmbariResource:
|
|
|
+
|
|
|
+ def __init__(self, serviceName, componentName):
|
|
|
+ self.serviveName = serviceName
|
|
|
+ self.componentName = componentName
|
|
|
+ self.isInitialized = False
|
|
|
+
|
|
|
+ def initializeResource(self):
|
|
|
+ global CLUSTER_NAME
|
|
|
+ if CLUSTER_NAME is None:
|
|
|
+ CLUSTER_NAME = self.findClusterName()
|
|
|
+
|
|
|
+ if self.serviveName is None:
|
|
|
+ raise Exception('Service name undefined')
|
|
|
+
|
|
|
+ if self.componentName is None:
|
|
|
+ raise Exception('Component name undefined')
|
|
|
+
|
|
|
+ handler = PreemptiveBasicAuthHandler()
|
|
|
+ opener = urllib2.build_opener(handler)
|
|
|
+ # Install opener for all requests
|
|
|
+ urllib2.install_opener(opener)
|
|
|
+ self.urlOpener = opener
|
|
|
+
|
|
|
+ self.old_hostname = self.getHostname()
|
|
|
+
|
|
|
+ self.isInitialized = True
|
|
|
+
|
|
|
+
|
|
|
+ def relocate(self, new_hostname):
|
|
|
+ if not self.isInitialized:
|
|
|
+ raise Exception('Resource not initialized')
|
|
|
+
|
|
|
+ if self.old_hostname == new_hostname:
|
|
|
+ raise Exception('New hostname is same as existing host name, %s' % self.old_hostname)
|
|
|
+
|
|
|
+ self.verifyHostComponentStatus(self.old_hostname, new_hostname, self.componentName)
|
|
|
+
|
|
|
+ # Delete current host component
|
|
|
+ self.deleteHostComponent(self.old_hostname, self.componentName)
|
|
|
+
|
|
|
+ # Add component on the new host
|
|
|
+ self.addHostComponent(new_hostname, self.componentName)
|
|
|
+
|
|
|
+ # Install host component
|
|
|
+ self.updateHostComponentStatus(new_hostname, self.componentName,
|
|
|
+ "Installing", "INSTALLED")
|
|
|
+
|
|
|
+ # Wait on install
|
|
|
+ self.waitOnHostComponentUpdate(new_hostname, self.componentName,
|
|
|
+ "INSTALLED")
|
|
|
+
|
|
|
+ # Start host component
|
|
|
+ self.updateHostComponentStatus(new_hostname, self.componentName,
|
|
|
+ "Starting", "STARTED")
|
|
|
+
|
|
|
+ # Wait on start
|
|
|
+ self.waitOnHostComponentUpdate(new_hostname, self.componentName, "STARTED")
|
|
|
+ pass
|
|
|
+
|
|
|
+ def waitOnHostComponentUpdate(self, hostname, componentName, status):
|
|
|
+ logger.info("Waiting for host component status to update ...")
|
|
|
+ sleep_itr = 0
|
|
|
+ while sleep_itr < STATUS_WAIT_TIMEOUT:
|
|
|
+ try:
|
|
|
+ state = self.getHostComponentState(hostname, componentName)
|
|
|
+ if status == state:
|
|
|
+ logger.info("Status update successful. status: %s" % state)
|
|
|
+ return
|
|
|
+ pass
|
|
|
+ except Exception, e:
|
|
|
+ logger.error("Caught an exception waiting for status update.. "
|
|
|
+ "continuing to wait...")
|
|
|
+
|
|
|
+ time.sleep(STATUS_CHECK_INTERVAL)
|
|
|
+ sleep_itr += STATUS_CHECK_INTERVAL
|
|
|
+ pass
|
|
|
+
|
|
|
+ def addHostComponent(self, hostname, componentName):
|
|
|
+ data = '{"host_components":[{"HostRoles":{"component_name":"%s"}}]}' % self.componentName
|
|
|
+ req = urllib2.Request(getUrl(ADD_HOST_COMPONENT_URI.format(CLUSTER_NAME,
|
|
|
+ hostname)), data)
|
|
|
+
|
|
|
+ req.add_header("X-Requested-By", "ambari_probe")
|
|
|
+ req.get_method = lambda: 'POST'
|
|
|
+ try:
|
|
|
+ logger.info("Adding host component: %s" % req.get_full_url())
|
|
|
+ resp = self.urlOpener.open(req)
|
|
|
+ self.logResponse('Add host component response: ', resp)
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Create host component failed, component: {0}, host: {1}'
|
|
|
+ .format(componentName, hostname))
|
|
|
+ logger.error(e)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+
|
|
|
+ def deleteHostComponent(self, hostname, componentName):
|
|
|
+ req = urllib2.Request(getUrl(HOST_COMPONENT_URI.format(CLUSTER_NAME,
|
|
|
+ hostname, componentName)))
|
|
|
+ req.add_header("X-Requested-By", "ambari_probe")
|
|
|
+ req.get_method = lambda: 'DELETE'
|
|
|
+ try:
|
|
|
+ logger.info("Deleting host component: %s" % req.get_full_url())
|
|
|
+ resp = self.urlOpener.open(req)
|
|
|
+ self.logResponse('Delete component response: ', resp)
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Delete {0} failed.'.format(componentName))
|
|
|
+ logger.error(e)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+
|
|
|
+ def updateHostComponentStatus(self, hostname, componentName, contextStr, status):
|
|
|
+ # Update host component
|
|
|
+ data = '{"RequestInfo":{"context":"%s %s"},"Body":{"HostRoles":{"state":"%s"}}}' % (contextStr, self.componentName, status)
|
|
|
+ req = urllib2.Request(getUrl(HOST_COMPONENT_URI.format(CLUSTER_NAME,
|
|
|
+ hostname, componentName)), data)
|
|
|
+ req.add_header("X-Requested-By", "ambari_probe")
|
|
|
+ req.get_method = lambda: 'PUT'
|
|
|
+ try:
|
|
|
+ logger.info("%s host component: %s" % (contextStr, req.get_full_url()))
|
|
|
+ resp = self.urlOpener.open(req)
|
|
|
+ self.logResponse('Start component response: ', resp)
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Update Status {0} failed.'.format(componentName))
|
|
|
+ logger.error(e)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+
|
|
|
+ def verifyHostComponentStatus(self, old_hostname, new_hostname, componentName):
|
|
|
+ # Check desired state of host component is not STOPPED or host is
|
|
|
+ # unreachable
|
|
|
+ actualState = self.getHostComponentState(old_hostname, componentName)
|
|
|
+
|
|
|
+ if actualState not in ALLOWED_ACTUAL_STATES_FOR_RELOCATE:
|
|
|
+ raise Exception('Aborting relocate action since host component '
|
|
|
+ 'state is %s' % actualState)
|
|
|
+
|
|
|
+ hostState = self.getHostSatus(new_hostname)
|
|
|
+ if hostState not in ALLOWED_HOST_STATUS_FOR_RELOCATE:
|
|
|
+ raise Exception('Aborting relocate action since host state is %s' % hostState)
|
|
|
+
|
|
|
+ pass
|
|
|
+
|
|
|
+ def getHostSatus(self, hostname):
|
|
|
+ hostStateUrl = getUrl(GET_HOST_STATE_URL.format(CLUSTER_NAME, hostname))
|
|
|
+
|
|
|
+ logger.info("Requesting host status: %s " % hostStateUrl)
|
|
|
+ urlResponse = self.urlOpener.open(hostStateUrl)
|
|
|
+ state = None
|
|
|
+
|
|
|
+ if urlResponse:
|
|
|
+ response = urlResponse.read()
|
|
|
+ data = json.loads(response)
|
|
|
+ logger.debug('Response from getHostSatus: %s' % data)
|
|
|
+ if data:
|
|
|
+ try:
|
|
|
+ hostsInfo = data.get('Hosts')
|
|
|
+ if not hostsInfo:
|
|
|
+ raise Exception('Cannot find host state for host: {1}'.format(hostname))
|
|
|
+
|
|
|
+ state = hostsInfo.get('host_state')
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Unable to parse json data. %s' % data)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+
|
|
|
+ else:
|
|
|
+ logger.error("Unable to retrieve host state.")
|
|
|
+ pass
|
|
|
+
|
|
|
+ return state
|
|
|
+
|
|
|
+
|
|
|
+ def getHostComponentState(self, hostname, componentName):
|
|
|
+ hostStatusUrl = getUrl(GET_HOST_COMPONENT_STATE_URI.format(CLUSTER_NAME,
|
|
|
+ hostname, componentName))
|
|
|
+
|
|
|
+ logger.info("Requesting host component state: %s " % hostStatusUrl)
|
|
|
+ urlResponse = self.urlOpener.open(hostStatusUrl)
|
|
|
+ state = None
|
|
|
+
|
|
|
+ if urlResponse:
|
|
|
+ response = urlResponse.read()
|
|
|
+ data = json.loads(response)
|
|
|
+ logger.debug('Response from getHostComponentState: %s' % data)
|
|
|
+ if data:
|
|
|
+ try:
|
|
|
+ hostRoles = data.get('HostRoles')
|
|
|
+ if not hostRoles:
|
|
|
+ raise Exception('Cannot find host component state for component: ' +\
|
|
|
+ '{0}, host: {1}'.format(componentName, hostname))
|
|
|
+
|
|
|
+ state = hostRoles.get('state')
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Unable to parse json data. %s' % data)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+
|
|
|
+ else:
|
|
|
+ logger.error("Unable to retrieve host component desired state.")
|
|
|
+ pass
|
|
|
+
|
|
|
+ return state
|
|
|
+
|
|
|
+
|
|
|
+ # Log response for PUT, POST or DELETE
|
|
|
+ def logResponse(self, text=None, response=None):
|
|
|
+ if response is not None:
|
|
|
+ resp = str(response.getcode())
|
|
|
+ if text is None:
|
|
|
+ text = 'Logging response from server: '
|
|
|
+ if resp is not None:
|
|
|
+ logger.info(text + resp)
|
|
|
+
|
|
|
+ def findClusterName(self):
|
|
|
+ clusterUrl = getUrl(GET_CLUSTERS_URI)
|
|
|
+ clusterName = None
|
|
|
+
|
|
|
+ logger.info("Requesting clusters: " + clusterUrl)
|
|
|
+ urlResponse = self.urlOpener.open(clusterUrl)
|
|
|
+ if urlResponse is not None:
|
|
|
+ response = urlResponse.read()
|
|
|
+ data = json.loads(response)
|
|
|
+ logger.debug('Response from findClusterName: %s' % data)
|
|
|
+ if data:
|
|
|
+ try:
|
|
|
+ clusters = data.get('items')
|
|
|
+ if len(clusters) > 1:
|
|
|
+ raise Exception('Multiple clusters found. %s' % clusters)
|
|
|
+
|
|
|
+ clusterName = clusters[0].get('Clusters').get('cluster_name')
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Unable to parse json data. %s' % data)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ logger.error("Unable to retrieve clusters data.")
|
|
|
+ pass
|
|
|
+
|
|
|
+ return clusterName
|
|
|
+
|
|
|
+ def getHostname(self):
|
|
|
+ hostsUrl = getUrl(GET_HOST_COMPONENTS_URI.format(CLUSTER_NAME,
|
|
|
+ self.serviveName, self.componentName))
|
|
|
+
|
|
|
+ logger.info("Requesting host info: " + hostsUrl)
|
|
|
+ urlResponse = self.urlOpener.open(hostsUrl)
|
|
|
+ hostname = None
|
|
|
+
|
|
|
+ if urlResponse is not None:
|
|
|
+ response = urlResponse.read()
|
|
|
+ data = json.loads(response)
|
|
|
+ logger.debug('Response from getHostname: %s' % data)
|
|
|
+ if data:
|
|
|
+ try:
|
|
|
+ hostRoles = data.get('host_components')
|
|
|
+ if not hostRoles:
|
|
|
+ raise Exception('Cannot find host component data for service: ' +\
|
|
|
+ '{0}, component: {1}'.format(self.serviveName, self.componentName))
|
|
|
+ if len(hostRoles) > 1:
|
|
|
+ raise Exception('More than one hosts found with the same role')
|
|
|
+
|
|
|
+ hostname = hostRoles[0].get('HostRoles').get('host_name')
|
|
|
+ except Exception, e:
|
|
|
+ logger.error('Unable to parse json data. %s' % data)
|
|
|
+ raise e
|
|
|
+ pass
|
|
|
+
|
|
|
+ else:
|
|
|
+ logger.error("Unable to retrieve host component data.")
|
|
|
+ pass
|
|
|
+
|
|
|
+ return hostname
|
|
|
+
|
|
|
+
|
|
|
+def getUrl(partial_url):
|
|
|
+ return PROTOCOL + "://" + AMBARI_HOSTNAME + ":" + AMBARI_PORT + partial_url
|
|
|
+
|
|
|
+def get_supported_actions():
|
|
|
+ return [ RELOCATE_ACTION ]
|
|
|
+
|
|
|
+#
|
|
|
+# Main.
|
|
|
+#
|
|
|
+def main():
|
|
|
+ tempDir = tempfile.gettempdir()
|
|
|
+ outputFile = os.path.join(tempDir, "ambari_reinstall_probe.out")
|
|
|
+
|
|
|
+ parser = optparse.OptionParser(usage="usage: %prog [options]")
|
|
|
+ parser.set_description('This python program is a Ambari thin client and '
|
|
|
+ 'supports relocation of ambari host components on '
|
|
|
+ 'Ambari managed clusters.')
|
|
|
+
|
|
|
+ parser.add_option("-v", "--verbose", dest="verbose", action="store_false",
|
|
|
+ default=False, help="output verbosity.")
|
|
|
+ parser.add_option("-s", "--host", dest="server_hostname",
|
|
|
+ help="Ambari server host name.")
|
|
|
+ parser.add_option("-p", "--port", dest="server_port",
|
|
|
+ default="8080" ,help="Ambari server port. [default: 8080]")
|
|
|
+ parser.add_option("-r", "--protocol", dest="protocol", default = "http",
|
|
|
+ help="Protocol for communicating with Ambari server ("
|
|
|
+ "http/https) [default: http].")
|
|
|
+ parser.add_option("-c", "--cluster-name", dest="cluster_name",
|
|
|
+ help="Ambari cluster to operate on.")
|
|
|
+ parser.add_option("-e", "--service-name", dest="service_name",
|
|
|
+ help="Ambari Service to which the component belongs to.")
|
|
|
+ parser.add_option("-m", "--component-name", dest="component_name",
|
|
|
+ help="Ambari Service Component to operate on.")
|
|
|
+ parser.add_option("-n", "--new-host", dest="new_hostname",
|
|
|
+ help="New host to relocate the component to.")
|
|
|
+ parser.add_option("-a", "--action", dest="action", default = "relocate",
|
|
|
+ help="Script action. [default: relocate]")
|
|
|
+ parser.add_option("-o", "--output-file", dest="outputfile",
|
|
|
+ default = outputFile, metavar="FILE",
|
|
|
+ help="Output file. [default: %s]" % outputFile)
|
|
|
+ parser.add_option("-u", "--username", dest="username",
|
|
|
+ default="admin" ,help="Ambari server admin user. [default: admin]")
|
|
|
+ parser.add_option("-w", "--password", dest="password",
|
|
|
+ default="admin" ,help="Ambari server admin password.")
|
|
|
+
|
|
|
+ (options, args) = parser.parse_args()
|
|
|
+
|
|
|
+ # set verbose
|
|
|
+ if options.verbose:
|
|
|
+ logging.basicConfig(level=logging.DEBUG)
|
|
|
+ else:
|
|
|
+ logging.basicConfig(level=logging.INFO)
|
|
|
+
|
|
|
+ global AMBARI_HOSTNAME
|
|
|
+ AMBARI_HOSTNAME = options.server_hostname
|
|
|
+
|
|
|
+ global AMBARI_PORT
|
|
|
+ AMBARI_PORT = options.server_port
|
|
|
+
|
|
|
+ global CLUSTER_NAME
|
|
|
+ CLUSTER_NAME = options.cluster_name
|
|
|
+
|
|
|
+ global PROTOCOL
|
|
|
+ PROTOCOL = options.protocol
|
|
|
+
|
|
|
+ global USERNAME
|
|
|
+ USERNAME = options.username
|
|
|
+
|
|
|
+ global PASSWORD
|
|
|
+ PASSWORD = options.password
|
|
|
+
|
|
|
+ global logger
|
|
|
+ logger = logging.getLogger('AmbariProbe')
|
|
|
+ handler = logging.FileHandler(options.outputfile)
|
|
|
+ formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
|
|
|
+ handler.setFormatter(formatter)
|
|
|
+ logger.addHandler(handler)
|
|
|
+
|
|
|
+ action = RELOCATE_ACTION
|
|
|
+
|
|
|
+ if options.action is not None:
|
|
|
+ if options.action not in get_supported_actions():
|
|
|
+ logger.error("Unsupported action: " + options.action + ", "
|
|
|
+ "valid actions: " + str(get_supported_actions()))
|
|
|
+ sys.exit(1)
|
|
|
+ else:
|
|
|
+ action = options.action
|
|
|
+
|
|
|
+ socket.setdefaulttimeout(DEFAULT_TIMEOUT)
|
|
|
+
|
|
|
+ ambariResource = AmbariResource(serviceName=options.service_name,
|
|
|
+ componentName=options.component_name)
|
|
|
+ ambariResource.initializeResource()
|
|
|
+
|
|
|
+ if action == RELOCATE_ACTION:
|
|
|
+ if options.new_hostname is not None:
|
|
|
+ ambariResource.relocate(options.new_hostname)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ try:
|
|
|
+ main()
|
|
|
+ except (KeyboardInterrupt, EOFError):
|
|
|
+ print("\nAborting ... Keyboard Interrupt.")
|
|
|
+ sys.exit(1)
|