|
@@ -182,22 +182,26 @@ class hadoopCluster:
|
|
|
return serviceData
|
|
|
|
|
|
def __check_job_status(self):
|
|
|
- initWaitCount = 20
|
|
|
- count = 0
|
|
|
+ failureCount = 0
|
|
|
status = False
|
|
|
state = 'Q'
|
|
|
- while state == 'Q':
|
|
|
+ while (state=='Q') or (state==False):
|
|
|
if hodInterrupt.isSet():
|
|
|
raise HodInterruptException()
|
|
|
|
|
|
state = self.__nodePool.getJobState()
|
|
|
- if (state==False) or (state!='Q'):
|
|
|
+ self.__log.debug('job state %s' % state)
|
|
|
+ if state == False:
|
|
|
+ failureCount += 1
|
|
|
+ if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
|
|
|
+ self.__log.debug('Number of retries reached max limit while querying job status')
|
|
|
+ break
|
|
|
+ time.sleep(self.__cfg['hod']['job-command-failure-interval'])
|
|
|
+ elif state!='Q':
|
|
|
break
|
|
|
- count = count + 1
|
|
|
- if count < initWaitCount:
|
|
|
- time.sleep(0.5)
|
|
|
else:
|
|
|
- time.sleep(10)
|
|
|
+ self.__log.debug('querying for job status after job-status-query-interval')
|
|
|
+ time.sleep(self.__cfg['hod']['job-status-query-interval'])
|
|
|
|
|
|
if state and state != 'C':
|
|
|
status = True
|
|
@@ -237,7 +241,7 @@ class hadoopCluster:
|
|
|
time.sleep(1)
|
|
|
count = count + 1
|
|
|
# check to see if the job exited by any chance in that time:
|
|
|
- if (count % 10 == 0):
|
|
|
+ if (count % self.__cfg['hod']['job-status-query-interval'] == 0):
|
|
|
if not self.__check_job_status():
|
|
|
break
|
|
|
|
|
@@ -256,9 +260,9 @@ class hadoopCluster:
|
|
|
serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
|
|
|
if serviceAddress:
|
|
|
if serviceAddress == 'not found':
|
|
|
- time.sleep(.5)
|
|
|
+ time.sleep(1)
|
|
|
# check to see if the job exited by any chance in that time:
|
|
|
- if (i % 10 == 0):
|
|
|
+ if ((i+1) % self.__cfg['hod']['job-status-query-interval'] == 0):
|
|
|
if not self.__check_job_status():
|
|
|
break
|
|
|
else:
|
|
@@ -420,6 +424,7 @@ class hadoopCluster:
|
|
|
|
|
|
def allocate(self, clusterDir, min, max=None):
|
|
|
status = 0
|
|
|
+ failureCount = 0
|
|
|
self.__svcrgyClient = self.__get_svcrgy_client()
|
|
|
|
|
|
self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
|
|
@@ -432,7 +437,25 @@ class hadoopCluster:
|
|
|
walltime = None
|
|
|
if self.__cfg['hod'].has_key('walltime'):
|
|
|
walltime = self.__cfg['hod']['walltime']
|
|
|
+
|
|
|
self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
|
|
|
+ # if the job submission returned an error other than no resources
|
|
|
+ # retry a couple of times
|
|
|
+ while (self.jobId is False) and (exitCode != 188):
|
|
|
+ if hodInterrupt.isSet():
|
|
|
+ raise HodInterruptException()
|
|
|
+
|
|
|
+ failureCount += 1
|
|
|
+ if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
|
|
|
+ self.__log.debug("failed submitting job more than the retries. exiting")
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ # wait a bit before retrying
|
|
|
+ time.sleep(self.__cfg['hod']['job-command-failure-interval'])
|
|
|
+ if hodInterrupt.isSet():
|
|
|
+ raise HodInterruptException()
|
|
|
+ self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
|
|
|
+
|
|
|
if self.jobId:
|
|
|
try:
|
|
|
jobStatus = self.__check_job_status()
|
|
@@ -558,12 +581,12 @@ class hadoopCluster:
|
|
|
if exitCode == 188:
|
|
|
self.__log.critical("Request execeeded maximum resource allocation.")
|
|
|
else:
|
|
|
- self.__log.critical("Insufficient resources available.")
|
|
|
+ self.__log.critical("Job submission failed with exit code %s" % exitCode)
|
|
|
status = 4
|
|
|
- else:
|
|
|
+ else:
|
|
|
self.__log.critical("Scheduler failure, allocation failed.\n\n")
|
|
|
status = 4
|
|
|
-
|
|
|
+
|
|
|
return status
|
|
|
|
|
|
def __isRingMasterAlive(self, rmAddr):
|