123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- #!/usr/bin/env python2.6
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import logging
- import logging.handlers
- import signal
- import json
- import socket
- import sys, traceback
- import time
- import threading
- import urllib2
- from urllib2 import Request, urlopen, URLError
- import httplib
- import ssl
- import AmbariConfig
- import pprint
- import ProcessHelper
- from Heartbeat import Heartbeat
- from Register import Register
- from ActionQueue import ActionQueue
- from optparse import OptionParser
- from wsgiref.simple_server import ServerHandler
- import security
- from NetUtil import NetUtil
- from random import randrange, randint
- logger = logging.getLogger()
- class Controller(threading.Thread):
- def __init__(self, config, range=120):
- threading.Thread.__init__(self)
- logger.debug('Initializing Controller RPC thread.')
- self.lock = threading.Lock()
- self.safeMode = True
- self.credential = None
- self.config = config
- self.hostname = socket.gethostname()
- server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port')
- self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
- self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
- self.netutil = NetUtil()
- self.responseId = -1
- self.repeatRegistration = False
- self.cachedconnect = None
- self.range = range
- def start(self):
- self.actionQueue = ActionQueue(self.config)
- self.actionQueue.start()
- self.register = Register()
- self.heartbeat = Heartbeat(self.actionQueue)
- pass
-
- def __del__(self):
- logger.info("Server connection disconnected.")
- pass
-
- def registerWithServer(self):
- retry=False
- firstTime=True
- registered=False
- id = -1
- ret = {}
- while not registered:
- try:
- data = json.dumps(self.register.build(id))
- logger.info("Registering with the server " + pprint.pformat(data))
- response = self.sendRequest(self.registerUrl, data)
- ret = json.loads(response)
- logger.info("Registered with the server with " + pprint.pformat(ret))
- print("Registered with the server")
- self.responseId= int(ret['responseId'])
- registered = True
- if 'statusCommands' in ret.keys():
- logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
- self.addToQueue(ret['statusCommands'])
- pass
- pass
- except Exception, err:
- # try a reconnect only after a certain amount of random time
- delay = randint(0, self.range)
- logger.info("Unable to connect to: " + self.registerUrl, exc_info = True)
- """ Sleeping for {0} seconds and then retrying again """.format(delay)
- time.sleep(delay)
- pass
- pass
- return ret
-
-
- def addToQueue(self, commands):
- """Add to the queue for running the commands """
- """ Put the required actions into the Queue """
- """ Verify if the action is to reboot or not """
- if not commands:
- logger.info("No commands from the server : " + pprint.pformat(commands))
- else:
- """Only add to the queue if not empty list """
- for command in commands:
- logger.info("Adding command to the action queue: \n" +
- pprint.pformat(command))
- self.actionQueue.put(command)
- pass
- pass
- pass
- # For testing purposes
- DEBUG_HEARTBEAT_RETRIES = 0
- DEBUG_SUCCESSFULL_HEARTBEATS = 0
- DEBUG_STOP_HEARTBITTING = False
- def heartbeatWithServer(self):
- self.DEBUG_HEARTBEAT_RETRIES = 0
- self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
- retry = False
- certVerifFailed = False
- config = AmbariConfig.config
- hb_interval = config.get('heartbeat', 'state_interval')
- #TODO make sure the response id is monotonically increasing
- id = 0
- while not self.DEBUG_STOP_HEARTBITTING:
- try:
- if not retry:
- data = json.dumps(self.heartbeat.build(self.responseId, int(hb_interval)))
- pass
- else:
- self.DEBUG_HEARTBEAT_RETRIES += 1
- response = self.sendRequest(self.heartbeatUrl, data)
- response = json.loads(response)
- logger.info('Got server response: ' + pprint.pformat(response))
-
- serverId=int(response['responseId'])
- if 'registrationCommand' in response.keys():
- # check if the registration command is None. If none skip
- if response['registrationCommand'] is not None:
- logger.info("RegistrationCommand received - repeat agent registration")
- self.repeatRegistration = True
- return
- if serverId!=self.responseId+1:
- logger.error("Error in responseId sequence - restarting")
- self.restartAgent()
- else:
- self.responseId=serverId
- if 'executionCommands' in response.keys():
- self.addToQueue(response['executionCommands'])
- pass
- if 'statusCommands' in response.keys():
- self.addToQueue(response['statusCommands'])
- pass
- if "true" == response['restartAgent']:
- logger.error("Got restartAgent command")
- self.restartAgent()
- else:
- logger.info("No commands sent from the Server.")
- pass
- if retry:
- print("Reconnected to the server")
- logger.info("Reconnected to the server")
- retry=False
- certVerifFailed = False
- self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
- self.DEBUG_HEARTBEAT_RETRIES = 0
- except Exception, err:
- #randomize the heartbeat
- delay = randint(0, self.range)
- time.sleep(delay)
- if "code" in err:
- logger.error(err.code)
- else:
- logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err))
- logger.debug("Details: " + str(err), exc_info=True)
- if not retry:
- print("Connection to the server was lost. Reconnecting...")
- if 'certificate verify failed' in str(err) and not certVerifFailed:
- print("Server certificate verify failed. Did you regenerate server certificate?")
- certVerifFailed = True
- self.cachedconnect = None # Previous connection is broken now
- retry=True
- if self.actionQueue.isIdle():
- time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
- else:
- time.sleep(self.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC)
- pass
- def run(self):
- opener = urllib2.build_opener()
- urllib2.install_opener(opener)
- while True:
- self.repeatRegistration = False
- self.registerAndHeartbeat()
- if not self.repeatRegistration:
- break
- pass
- def registerAndHeartbeat(self):
- registerResponse = self.registerWithServer()
- message = registerResponse['response']
- logger.info("Response from server = " + message)
- time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
- self.heartbeatWithServer()
- def restartAgent(self):
- ProcessHelper.restartAgent()
- pass
- def sendRequest(self, url, data):
- if self.cachedconnect is None: # Lazy initialization
- self.cachedconnect = security.CachedHTTPSConnection(self.config)
- req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
- response = self.cachedconnect.request(req)
- return response
- def main(argv=None):
- # Allow Ctrl-C
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- logger.setLevel(logging.INFO)
- formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
- %(message)s")
- stream_handler = logging.StreamHandler()
- stream_handler.setFormatter(formatter)
- logger.addHandler(stream_handler)
- logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
- config = AmbariConfig.config
- collector = Controller(config)
- collector.start()
- collector.run()
- if __name__ == '__main__':
- main()
|