|
@@ -87,7 +87,8 @@ class hodState:
|
|
os.remove(item)
|
|
os.remove(item)
|
|
|
|
|
|
class hodRunner:
|
|
class hodRunner:
|
|
- def __init__(self, cfg):
|
|
|
|
|
|
+
|
|
|
|
+ def __init__(self, cfg, log=None, cluster=None):
|
|
self.__hodhelp = hodHelp()
|
|
self.__hodhelp = hodHelp()
|
|
self.__ops = self.__hodhelp.ops
|
|
self.__ops = self.__hodhelp.ops
|
|
self.__cfg = cfg
|
|
self.__cfg = cfg
|
|
@@ -96,14 +97,22 @@ class hodRunner:
|
|
self.__user = getpass.getuser()
|
|
self.__user = getpass.getuser()
|
|
self.__registry = None
|
|
self.__registry = None
|
|
self.__baseLogger = None
|
|
self.__baseLogger = None
|
|
- self.__setup_logger()
|
|
|
|
|
|
+ # Allowing to pass in log object to help testing - a stub can be passed in
|
|
|
|
+ if log is None:
|
|
|
|
+ self.__setup_logger()
|
|
|
|
+ else:
|
|
|
|
+ self.__log = log
|
|
|
|
|
|
self.__userState = hodState(self.__cfg['hod']['user_state'])
|
|
self.__userState = hodState(self.__cfg['hod']['user_state'])
|
|
|
|
|
|
self.__clusterState = None
|
|
self.__clusterState = None
|
|
self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
|
|
self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
|
|
|
|
|
|
- self.__cluster = hadoopCluster(self.__cfg, self.__log)
|
|
|
|
|
|
+ # Allowing to pass in log object to help testing - a stib can be passed in
|
|
|
|
+ if cluster is None:
|
|
|
|
+ self.__cluster = hadoopCluster(self.__cfg, self.__log)
|
|
|
|
+ else:
|
|
|
|
+ self.__cluster = cluster
|
|
|
|
|
|
def __setup_logger(self):
|
|
def __setup_logger(self):
|
|
self.__baseLogger = hodlib.Common.logger.hodLog('hod')
|
|
self.__baseLogger = hodlib.Common.logger.hodLog('hod')
|
|
@@ -206,6 +215,12 @@ class hodRunner:
|
|
self.__opCode = 3
|
|
self.__opCode = 3
|
|
return
|
|
return
|
|
|
|
|
|
|
|
+ clusterList = self.__userState.read(CLUSTER_DATA_FILE)
|
|
|
|
+ if clusterDir in clusterList.keys():
|
|
|
|
+ self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. Deallocate the cluster first." % (clusterDir))
|
|
|
|
+ self.__opCode = 12
|
|
|
|
+ return
|
|
|
|
+
|
|
self.__setup_cluster_logger(clusterDir)
|
|
self.__setup_cluster_logger(clusterDir)
|
|
if re.match('\d+-\d+', nodes):
|
|
if re.match('\d+-\d+', nodes):
|
|
(min, max) = nodes.split("-")
|
|
(min, max) = nodes.split("-")
|
|
@@ -292,8 +307,7 @@ class hodRunner:
|
|
self.__setup_cluster_state(clusterDir)
|
|
self.__setup_cluster_state(clusterDir)
|
|
clusterInfo = self.__clusterState.read()
|
|
clusterInfo = self.__clusterState.read()
|
|
if clusterInfo == {}:
|
|
if clusterInfo == {}:
|
|
- self.__opCode = 15
|
|
|
|
- self.__log.critical("Cluster %s not allocated." % clusterDir)
|
|
|
|
|
|
+ self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
|
|
else:
|
|
else:
|
|
self.__opCode = \
|
|
self.__opCode = \
|
|
self.__cluster.deallocate(clusterDir, clusterInfo)
|
|
self.__cluster.deallocate(clusterDir, clusterInfo)
|
|
@@ -302,9 +316,7 @@ class hodRunner:
|
|
self.__clusterState.clear()
|
|
self.__clusterState.clear()
|
|
self.__remove_cluster(clusterDir)
|
|
self.__remove_cluster(clusterDir)
|
|
else:
|
|
else:
|
|
- self.__log.critical("Invalid cluster directory '%s' specified." %
|
|
|
|
- clusterDir)
|
|
|
|
- self.__opCode = 3
|
|
|
|
|
|
+ self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
|
|
else:
|
|
else:
|
|
print self.__hodhelp.help_deallocate()
|
|
print self.__hodhelp.help_deallocate()
|
|
self.__log.critical("%s operation requires one argument. " % operation
|
|
self.__log.critical("%s operation requires one argument. " % operation
|
|
@@ -314,8 +326,15 @@ class hodRunner:
|
|
def _op_list(self, args):
|
|
def _op_list(self, args):
|
|
clusterList = self.__userState.read(CLUSTER_DATA_FILE)
|
|
clusterList = self.__userState.read(CLUSTER_DATA_FILE)
|
|
for path in clusterList.keys():
|
|
for path in clusterList.keys():
|
|
|
|
+ if not os.path.isdir(path):
|
|
|
|
+ self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
|
|
|
|
+ continue
|
|
self.__setup_cluster_state(path)
|
|
self.__setup_cluster_state(path)
|
|
clusterInfo = self.__clusterState.read()
|
|
clusterInfo = self.__clusterState.read()
|
|
|
|
+ if clusterInfo == {}:
|
|
|
|
+ # something wrong with the cluster directory.
|
|
|
|
+ self.__log.info("cluster state unknown\t%s\t%s" % (clusterList[path], path))
|
|
|
|
+ continue
|
|
clusterStatus = self.__cluster.check_cluster(clusterInfo)
|
|
clusterStatus = self.__cluster.check_cluster(clusterInfo)
|
|
if clusterStatus == 12:
|
|
if clusterStatus == 12:
|
|
self.__log.info("alive\t%s\t%s" % (clusterList[path], path))
|
|
self.__log.info("alive\t%s\t%s" % (clusterList[path], path))
|
|
@@ -334,34 +353,51 @@ class hodRunner:
|
|
if os.path.isdir(clusterDir):
|
|
if os.path.isdir(clusterDir):
|
|
self.__setup_cluster_state(clusterDir)
|
|
self.__setup_cluster_state(clusterDir)
|
|
clusterInfo = self.__clusterState.read()
|
|
clusterInfo = self.__clusterState.read()
|
|
- clusterStatus = self.__cluster.check_cluster(clusterInfo)
|
|
|
|
- if clusterStatus == 12:
|
|
|
|
- self.__print_cluster_info(clusterInfo)
|
|
|
|
- self.__log.info("hadoop-site.xml at %s" % clusterDir)
|
|
|
|
- elif clusterStatus == 10:
|
|
|
|
- self.__log.critical("%s cluster is dead" % clusterDir)
|
|
|
|
- elif clusterStatus == 13:
|
|
|
|
- self.__log.warn("%s cluster hdfs is dead" % clusterDir)
|
|
|
|
- elif clusterStatus == 14:
|
|
|
|
- self.__log.warn("%s cluster mapred is dead" % clusterDir)
|
|
|
|
-
|
|
|
|
- if clusterStatus != 12:
|
|
|
|
- if clusterStatus == 15:
|
|
|
|
- self.__log.critical("Cluster %s not allocated." % clusterDir)
|
|
|
|
- else:
|
|
|
|
|
|
+ if clusterInfo == {}:
|
|
|
|
+ # something wrong with the cluster directory.
|
|
|
|
+ self.__handle_invalid_cluster_directory(clusterDir)
|
|
|
|
+ else:
|
|
|
|
+ clusterStatus = self.__cluster.check_cluster(clusterInfo)
|
|
|
|
+ if clusterStatus == 12:
|
|
self.__print_cluster_info(clusterInfo)
|
|
self.__print_cluster_info(clusterInfo)
|
|
self.__log.info("hadoop-site.xml at %s" % clusterDir)
|
|
self.__log.info("hadoop-site.xml at %s" % clusterDir)
|
|
|
|
+ elif clusterStatus == 10:
|
|
|
|
+ self.__log.critical("%s cluster is dead" % clusterDir)
|
|
|
|
+ elif clusterStatus == 13:
|
|
|
|
+ self.__log.warn("%s cluster hdfs is dead" % clusterDir)
|
|
|
|
+ elif clusterStatus == 14:
|
|
|
|
+ self.__log.warn("%s cluster mapred is dead" % clusterDir)
|
|
|
|
+
|
|
|
|
+ if clusterStatus != 12:
|
|
|
|
+ if clusterStatus == 15:
|
|
|
|
+ self.__log.critical("Cluster %s not allocated." % clusterDir)
|
|
|
|
+ else:
|
|
|
|
+ self.__print_cluster_info(clusterInfo)
|
|
|
|
+ self.__log.info("hadoop-site.xml at %s" % clusterDir)
|
|
|
|
|
|
- self.__opCode = clusterStatus
|
|
|
|
|
|
+ self.__opCode = clusterStatus
|
|
else:
|
|
else:
|
|
- self.__log.critical("'%s' does not exist." % clusterDir)
|
|
|
|
- self.__opCode = 3
|
|
|
|
|
|
+ self.__handle_invalid_cluster_directory(clusterDir)
|
|
else:
|
|
else:
|
|
print self.__hodhelp.help_info()
|
|
print self.__hodhelp.help_info()
|
|
self.__log.critical("%s operation requires one argument. " % operation
|
|
self.__log.critical("%s operation requires one argument. " % operation
|
|
+ "A cluster path.")
|
|
+ "A cluster path.")
|
|
self.__opCode = 3
|
|
self.__opCode = 3
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
|
|
|
|
+ clusterList = self.__userState.read(CLUSTER_DATA_FILE)
|
|
|
|
+ if clusterDir in clusterList.keys():
|
|
|
|
+ # previously allocated cluster.
|
|
|
|
+ self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
|
|
|
|
+ if cleanUp:
|
|
|
|
+ self.__cluster.delete_job(clusterList[clusterDir])
|
|
|
|
+ self.__remove_cluster(clusterDir)
|
|
|
|
+ self.__log.critical("Freeing resources allocated to the cluster.")
|
|
|
|
+ self.__opCode = 3
|
|
|
|
+ else:
|
|
|
|
+ self.__log.critical("'%s' is not a valid cluster directory." % (clusterDir))
|
|
|
|
+ self.__opCode = 15
|
|
|
|
+
|
|
def __print_cluster_info(self, clusterInfo):
|
|
def __print_cluster_info(self, clusterInfo):
|
|
keys = clusterInfo.keys()
|
|
keys = clusterInfo.keys()
|
|
|
|
|