|
@@ -29,7 +29,9 @@ import threading
|
|
|
import urllib2
|
|
|
from urllib2 import Request, urlopen, URLError
|
|
|
import AmbariConfig
|
|
|
+import pprint
|
|
|
from Heartbeat import Heartbeat
|
|
|
+from Register import Register
|
|
|
from ActionQueue import ActionQueue
|
|
|
from optparse import OptionParser
|
|
|
from wsgiref.simple_server import ServerHandler
|
|
@@ -50,48 +52,88 @@ class Controller(threading.Thread):
|
|
|
# self.credential = { 'user' : config.get('controller', 'user'),
|
|
|
# 'password' : config.get('controller', 'password')
|
|
|
# }
|
|
|
- self.url = config.get('server', 'url') + '/agent/register/' + socket.gethostname()
|
|
|
-
|
|
|
+ self.hostname = socket.gethostname()
|
|
|
+ self.registerUrl = config.get('server', 'url') + \
|
|
|
+ '/agent/register/' + self.hostname
|
|
|
+ self.heartbeatUrl = config.get('server', 'url') + \
|
|
|
+ '/agent/heartbeat/' + self.hostname
|
|
|
+
|
|
|
def start(self):
|
|
|
self.actionQueue = ActionQueue(self.config)
|
|
|
self.actionQueue.start()
|
|
|
+ self.register = Register()
|
|
|
self.heartbeat = Heartbeat(self.actionQueue)
|
|
|
-
|
|
|
+ pass
|
|
|
+
|
|
|
def __del__(self):
|
|
|
logger.info("Server connection disconnected.")
|
|
|
-
|
|
|
- def run(self):
|
|
|
- id='-1'
|
|
|
- opener = urllib2.build_opener()
|
|
|
- urllib2.install_opener(opener)
|
|
|
+ pass
|
|
|
+
|
|
|
+ def registerWithServer(self):
|
|
|
retry=False
|
|
|
firstTime=True
|
|
|
+ registered=False
|
|
|
+ id = -1
|
|
|
+ ret = {}
|
|
|
+ while registered == False:
|
|
|
+ try:
|
|
|
+ data = json.dumps(self.register.build(id))
|
|
|
+ req = urllib2.Request(self.registerUrl, data, {'Content-Type':
|
|
|
+ 'application/json'})
|
|
|
+ stream = urllib2.urlopen(req)
|
|
|
+ response = stream.read()
|
|
|
+ stream.close()
|
|
|
+ ret = json.loads(response)
|
|
|
+ logger.info("Registered with the server with " + pprint.pformat(ret))
|
|
|
+ registered = True
|
|
|
+ pass
|
|
|
+ except Exception, err:
|
|
|
+ logger.info("Unable to connect to: " + self.registerUrl, exc_info = True)
|
|
|
+ """ sleep for 30 seconds and then retry again """
|
|
|
+ time.sleep(30)
|
|
|
+ pass
|
|
|
+ pass
|
|
|
+ return ret
|
|
|
+
|
|
|
+ def heartbeatWithServer(self):
|
|
|
+ retry = False
|
|
|
+ #TODO make sure the response id is monotonically increasing
|
|
|
+ id = 0
|
|
|
while True:
|
|
|
try:
|
|
|
if retry==False:
|
|
|
data = json.dumps(self.heartbeat.build(id))
|
|
|
- req = urllib2.Request(self.url, data, {'Content-Type': 'application/json'})
|
|
|
+ req = urllib2.Request(self.heartbeatUrl, data, {'Content-Type': 'application/json'})
|
|
|
f = urllib2.urlopen(req)
|
|
|
response = f.read()
|
|
|
f.close()
|
|
|
data = json.loads(response)
|
|
|
id=int(data['responseId'])
|
|
|
- self.actionQueue.put(data)
|
|
|
- if retry==True or firstTime==True:
|
|
|
- logger.info("Controller connection established")
|
|
|
- firstTime=False
|
|
|
+ logger.info("HeartBeat Response from Server: \n" + pprint.pformat(data))
|
|
|
retry=False
|
|
|
except Exception, err:
|
|
|
retry=True
|
|
|
if "code" in err:
|
|
|
logger.error(err.code)
|
|
|
else:
|
|
|
- logger.error("Unable to connect to: "+self.url,exc_info=True)
|
|
|
+ logger.error("Unable to connect to: "+self.heartbeatUrl,exc_info=True)
|
|
|
if self.actionQueue.isIdle():
|
|
|
time.sleep(30)
|
|
|
else:
|
|
|
- time.sleep(1)
|
|
|
+ time.sleep(1)
|
|
|
+ pass
|
|
|
+
|
|
|
|
|
|
+ def run(self):
|
|
|
+ opener = urllib2.build_opener()
|
|
|
+ urllib2.install_opener(opener)
|
|
|
+
|
|
|
+ registerResponse = self.registerWithServer()
|
|
|
+ message = registerResponse['response']
|
|
|
+ logger.info("Response from server = " + message)
|
|
|
+ self.heartbeatWithServer()
|
|
|
+ pass
|
|
|
+
|
|
|
def main(argv=None):
|
|
|
# Allow Ctrl-C
|
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|