hadoop.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define WorkLoad as abstract interface for user job"""
  15. # -*- python -*-
  16. import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, math
  17. from HTMLParser import HTMLParser
  18. import xml.dom.minidom
  19. import xml.dom.pulldom
  20. from xml.dom import getDOMImplementation
  21. from hodlib.Common.util import *
  22. from hodlib.Common.xmlrpc import hodXRClient
  23. from hodlib.Common.miniHTMLParser import miniHTMLParser
  24. from hodlib.Common.nodepoolutil import NodePoolUtil
  25. from hodlib.Common.tcp import tcpError, tcpSocket
  26. reCommandDelimeterString = r"(?<!\\);"
  27. reCommandDelimeter = re.compile(reCommandDelimeterString)
  28. class hadoopConfig:
  29. def __create_xml_element(self, doc, name, value, description, final = False):
  30. prop = doc.createElement("property")
  31. nameP = doc.createElement("name")
  32. string = doc.createTextNode(name)
  33. nameP.appendChild(string)
  34. valueP = doc.createElement("value")
  35. string = doc.createTextNode(value)
  36. valueP.appendChild(string)
  37. if final:
  38. finalP = doc.createElement("final")
  39. string = doc.createTextNode("true")
  40. finalP.appendChild(string)
  41. desc = doc.createElement("description")
  42. string = doc.createTextNode(description)
  43. desc.appendChild(string)
  44. prop.appendChild(nameP)
  45. prop.appendChild(valueP)
  46. if final:
  47. prop.appendChild(finalP)
  48. prop.appendChild(desc)
  49. return prop
  50. def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr, mrSysDir,\
  51. mapredAddr=None, clientParams=None, serverParams=None,\
  52. finalServerParams=None, clusterFactor=None):
  53. if not mapredAddr:
  54. mapredAddr = "dummy:8181"
  55. implementation = getDOMImplementation()
  56. doc = implementation.createDocument('', 'configuration', None)
  57. comment = doc.createComment(
  58. "This is an auto generated hadoop-site.xml, do not modify")
  59. topElement = doc.documentElement
  60. topElement.appendChild(comment)
  61. description = {}
  62. paramsDict = { 'mapred.job.tracker' : mapredAddr , \
  63. 'fs.default.name' : "hdfs://" + hdfsAddr, \
  64. 'hadoop.tmp.dir' : tempDir, \
  65. 'dfs.client.buffer.dir' : os.path.join(tempDir, 'dfs',
  66. 'tmp'),
  67. }
  68. paramsDict['mapred.system.dir'] = mrSysDir
  69. # mapred-default.xml is no longer used now.
  70. numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
  71. paramsDict['mapred.reduce.tasks'] = str(numred)
  72. # end
  73. # for all the above vars generated, set the description
  74. for k, v in paramsDict.iteritems():
  75. description[k] = 'Hod generated parameter'
  76. # finalservelParams
  77. if finalServerParams:
  78. for k, v in finalServerParams.iteritems():
  79. if not description.has_key(k):
  80. description[k] = "final server parameter"
  81. paramsDict[k] = v
  82. # servelParams
  83. if serverParams:
  84. for k, v in serverParams.iteritems():
  85. if not description.has_key(k):
  86. # if no final value for same param is mentioned
  87. description[k] = "server parameter"
  88. paramsDict[k] = v
  89. # clientParams
  90. if clientParams:
  91. for k, v in clientParams.iteritems():
  92. if not description.has_key(k) or description[k] == "server parameter":
  93. # Just add, if no final value for same param is mentioned.
  94. # Replace even if server param is mentioned for same config variable
  95. description[k] = "client-side parameter"
  96. paramsDict[k] = v
  97. # generate the xml elements
  98. for k,v in paramsDict.iteritems():
  99. if ( description[k] == "final server parameter" or \
  100. description[k] == "Hod generated parameter" ):
  101. final = True
  102. else: final = False
  103. prop = self.__create_xml_element(doc, k, v, description[k], final)
  104. topElement.appendChild(prop)
  105. siteName = os.path.join(confDir, "hadoop-site.xml")
  106. sitefile = file(siteName, 'w')
  107. print >> sitefile, topElement.toxml()
  108. sitefile.close()
  109. class hadoopCluster:
  110. def __init__(self, cfg, log):
  111. self.__cfg = cfg
  112. self.__log = log
  113. self.__changedClusterParams = []
  114. self.__hostname = local_fqdn()
  115. self.__svcrgyClient = None
  116. self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'],
  117. self.__cfg, self.__log)
  118. self.__hadoopCfg = hadoopConfig()
  119. self.jobId = None
  120. self.mapredInfo = None
  121. self.hdfsInfo = None
  122. self.ringmasterXRS = None
  123. def __get_svcrgy_client(self):
  124. svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])
  125. return hodXRClient(svcrgyUrl)
  126. def __get_service_status(self):
  127. serviceData = self.__get_service_data()
  128. status = True
  129. hdfs = False
  130. mapred = False
  131. for host in serviceData.keys():
  132. for item in serviceData[host]:
  133. service = item.keys()
  134. if service[0] == 'hdfs.grid' and \
  135. self.__cfg['gridservice-hdfs']['external'] == False:
  136. hdfs = True
  137. elif service[0] == 'mapred.grid':
  138. mapred = True
  139. if not mapred:
  140. status = "mapred"
  141. if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:
  142. if status != True:
  143. status = "mapred and hdfs"
  144. else:
  145. status = "hdfs"
  146. return status
  147. def __get_service_data(self):
  148. registry = to_http_url(self.__cfg['hod']['xrs-address'])
  149. serviceData = self.__svcrgyClient.getServiceInfo(
  150. self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())
  151. return serviceData
  152. def __check_job_status(self):
  153. initWaitCount = 20
  154. count = 0
  155. status = False
  156. state = 'Q'
  157. userLimitsFirstFlag = True
  158. while state == 'Q':
  159. if hodInterrupt.isSet():
  160. raise HodInterruptException()
  161. jobInfo = self.__nodePool.getJobInfo()
  162. state = jobInfo['job_state']
  163. if (state==False) or (state!='Q'):
  164. break
  165. count = count + 1
  166. if count < initWaitCount:
  167. time.sleep(0.5)
  168. else:
  169. time.sleep(10)
  170. if self.__cfg['hod'].has_key('job-feasibility-attr') and \
  171. self.__cfg['hod']['job-feasibility-attr']:
  172. (status, msg) = self.__isJobFeasible()
  173. if status == "Never":
  174. self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \
  175. "This cluster cannot be allocated now.")
  176. return -1
  177. elif status == False:
  178. if userLimitsFirstFlag:
  179. self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \
  180. "This cluster allocation will succeed only after other " + \
  181. "clusters are deallocated.")
  182. userLimitsFirstFlag = False
  183. if state and state != 'C':
  184. status = True
  185. return status
  186. def __isJobFeasible(self):
  187. return self.__nodePool.isJobFeasible()
  188. def __get_ringmaster_client(self):
  189. ringmasterXRS = None
  190. ringList = self.__svcrgyClient.getServiceInfo(
  191. self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
  192. 'ringmaster', 'hod')
  193. if ringList and len(ringList):
  194. if isinstance(ringList, list):
  195. ringmasterXRS = ringList[0]['xrs']
  196. else:
  197. count = 0
  198. waitTime = self.__cfg['hod']['allocate-wait-time']
  199. while count < waitTime:
  200. if hodInterrupt.isSet():
  201. raise HodInterruptException()
  202. ringList = self.__svcrgyClient.getServiceInfo(
  203. self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
  204. 'ringmaster',
  205. 'hod')
  206. if ringList and len(ringList):
  207. if isinstance(ringList, list):
  208. ringmasterXRS = ringList[0]['xrs']
  209. if ringmasterXRS is not None:
  210. break
  211. else:
  212. time.sleep(1)
  213. count = count + 1
  214. # check to see if the job exited by any chance in that time:
  215. if (count % 10 == 0):
  216. if not self.__check_job_status():
  217. break
  218. return ringmasterXRS
  219. def __init_hadoop_service(self, serviceName, xmlrpcClient):
  220. status = True
  221. serviceAddress = None
  222. serviceInfo = None
  223. for i in range(0, 250):
  224. try:
  225. if hodInterrupt.isSet():
  226. raise HodInterruptException()
  227. serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
  228. if serviceAddress:
  229. if serviceAddress == 'not found':
  230. time.sleep(.5)
  231. # check to see if the job exited by any chance in that time:
  232. if (i % 10 == 0):
  233. if not self.__check_job_status():
  234. break
  235. else:
  236. serviceInfo = xmlrpcClient.getURLs(serviceName)
  237. break
  238. except HodInterruptException,h :
  239. raise h
  240. except:
  241. self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
  242. self.__log.debug(get_exception_string())
  243. status = False
  244. break
  245. if serviceAddress == 'not found' or not serviceAddress:
  246. self.__log.critical("Failed to retrieve '%s' service address." %
  247. serviceName)
  248. status = False
  249. elif serviceAddress.startswith("Error: "):
  250. errs = serviceAddress[len("Error: "):]
  251. self.__log.critical("Cluster could not be allocated because of the following errors.\n%s" % \
  252. errs)
  253. status = False
  254. else:
  255. try:
  256. self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'],
  257. self.jobId, self.__hostname,
  258. serviceName, 'grid', serviceInfo)
  259. except HodInterruptException, h:
  260. raise h
  261. except:
  262. self.__log.critical("'%s': registry xmlrpc error." % serviceName)
  263. self.__log.debug(get_exception_string())
  264. status = False
  265. return status, serviceAddress, serviceInfo
  266. def __collect_jobtracker_ui(self, dir):
  267. link = self.mapredInfo + "/jobtracker.jsp"
  268. parser = miniHTMLParser()
  269. parser.setBaseUrl(self.mapredInfo)
  270. node_cache = {}
  271. self.__log.debug("collect_jobtracker_ui seeded with " + link)
  272. def alarm_handler(number, stack):
  273. raise AlarmException("timeout")
  274. signal.signal(signal.SIGALRM, alarm_handler)
  275. input = None
  276. while link:
  277. self.__log.debug("link: %s" % link)
  278. # taskstats.jsp,taskdetails.jsp not included since too many to collect
  279. if re.search(
  280. "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp",
  281. link):
  282. for i in range(1,5):
  283. if hodInterrupt.isSet():
  284. raise HodInterruptException()
  285. try:
  286. input = urllib.urlopen(link)
  287. break
  288. except:
  289. self.__log.debug(get_exception_string())
  290. time.sleep(1)
  291. if input:
  292. out = None
  293. self.__log.debug("collecting " + link + "...")
  294. filename = re.sub(self.mapredInfo, "", link)
  295. filename = dir + "/" + filename
  296. filename = re.sub("http://","", filename)
  297. filename = re.sub("[\?\&=:]","_",filename)
  298. filename = filename + ".html"
  299. try:
  300. tempdir, tail = os.path.split(filename)
  301. if not os.path.exists(tempdir):
  302. os.makedirs(tempdir)
  303. except:
  304. self.__log.debug(get_exception_string())
  305. out = open(filename, 'w')
  306. bufSz = 8192
  307. signal.alarm(10)
  308. try:
  309. self.__log.debug("Starting to grab: %s" % link)
  310. buf = input.read(bufSz)
  311. while len(buf) > 0:
  312. # Feed the file into the HTML parser
  313. parser.feed(buf)
  314. # Re-write the hrefs in the file
  315. p = re.compile("\?(.+?)=(.+?)")
  316. buf = p.sub(r"_\1_\2",buf)
  317. p= re.compile("&(.+?)=(.+?)")
  318. buf = p.sub(r"_\1_\2",buf)
  319. p = re.compile("http://(.+?):(\d+)?")
  320. buf = p.sub(r"\1_\2/",buf)
  321. buf = re.sub("href=\"/","href=\"",buf)
  322. p = re.compile("href=\"(.+?)\"")
  323. buf = p.sub(r"href=\1.html",buf)
  324. out.write(buf)
  325. buf = input.read(bufSz)
  326. signal.alarm(0)
  327. input.close()
  328. if out:
  329. out.close()
  330. self.__log.debug("Finished grabbing: %s" % link)
  331. except AlarmException:
  332. if hodInterrupt.isSet():
  333. raise HodInterruptException()
  334. if out: out.close()
  335. if input: input.close()
  336. self.__log.debug("Failed to retrieve: %s" % link)
  337. else:
  338. self.__log.debug("Failed to retrieve: %s" % link)
  339. # Get the next link in level traversal order
  340. link = parser.getNextLink()
  341. parser.close()
  342. def check_cluster(self, clusterInfo):
  343. status = 0
  344. if 'mapred' in clusterInfo:
  345. mapredAddress = clusterInfo['mapred'][7:]
  346. hdfsAddress = clusterInfo['hdfs'][7:]
  347. status = get_cluster_status(hdfsAddress, mapredAddress)
  348. if status == 0:
  349. status = 12
  350. else:
  351. status = 15
  352. return status
  353. def is_cluster_deallocated(self, jobId):
  354. """Returns True if the JobId that represents this cluster
  355. is in the Completed or exiting state."""
  356. jobInfo = self.__nodePool.getJobInfo(jobId)
  357. state = None
  358. if jobInfo is not None and jobInfo.has_key('job_state'):
  359. state = jobInfo['job_state']
  360. return ((state == 'C') or (state == 'E'))
  361. def cleanup(self):
  362. if self.__nodePool: self.__nodePool.finalize()
  363. def get_job_id(self):
  364. return self.jobId
  365. def delete_job(self, jobId):
  366. '''Delete a job given it's ID'''
  367. ret = 0
  368. if self.__nodePool:
  369. ret = self.__nodePool.deleteJob(jobId)
  370. else:
  371. raise Exception("Invalid state: Node pool is not initialized to delete the given job.")
  372. return ret
  373. def is_valid_account(self):
  374. """Verify if the account being used to submit the job is a valid account.
  375. This code looks for a file <install-dir>/bin/verify-account.
  376. If the file is present, it executes the file, passing as argument
  377. the account name. It returns the exit code and output from the
  378. script on non-zero exit code."""
  379. accountValidationScript = os.path.abspath('./verify-account')
  380. if not os.path.exists(accountValidationScript):
  381. return (0, None)
  382. account = self.__nodePool.getAccountString()
  383. exitCode = 0
  384. errMsg = None
  385. try:
  386. accountValidationCmd = simpleCommand('Account Validation Command',\
  387. '%s %s' % (accountValidationScript,
  388. account))
  389. accountValidationCmd.start()
  390. accountValidationCmd.wait()
  391. accountValidationCmd.join()
  392. exitCode = accountValidationCmd.exit_code()
  393. self.__log.debug('account validation script is run %d' \
  394. % exitCode)
  395. errMsg = None
  396. if exitCode is not 0:
  397. errMsg = accountValidationCmd.output()
  398. except Exception, e:
  399. exitCode = 0
  400. self.__log.warn('Error executing account script: %s ' \
  401. 'Accounting is disabled.' \
  402. % get_exception_error_string())
  403. self.__log.debug(get_exception_string())
  404. return (exitCode, errMsg)
  405. def allocate(self, clusterDir, min, max=None):
  406. status = 0
  407. self.__svcrgyClient = self.__get_svcrgy_client()
  408. self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
  409. if min < 3:
  410. self.__log.critical("Minimum nodes must be greater than 2.")
  411. status = 2
  412. else:
  413. nodeSet = self.__nodePool.newNodeSet(min)
  414. walltime = None
  415. if self.__cfg['hod'].has_key('walltime'):
  416. walltime = self.__cfg['hod']['walltime']
  417. self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
  418. if self.jobId:
  419. jobStatus = None
  420. try:
  421. jobStatus = self.__check_job_status()
  422. except HodInterruptException, h:
  423. self.__log.info(HOD_INTERRUPTED_MESG)
  424. self.delete_job(self.jobId)
  425. self.__log.info("Cluster %s removed from queue." % self.jobId)
  426. raise h
  427. else:
  428. if jobStatus == -1:
  429. self.delete_job(self.jobId);
  430. status = 4
  431. return status
  432. if jobStatus:
  433. self.__log.info("Cluster Id %s" \
  434. % self.jobId)
  435. try:
  436. self.ringmasterXRS = self.__get_ringmaster_client()
  437. self.__log.debug("Ringmaster at : %s" % self.ringmasterXRS )
  438. ringClient = None
  439. if self.ringmasterXRS:
  440. ringClient = hodXRClient(self.ringmasterXRS)
  441. hdfsStatus, hdfsAddr, self.hdfsInfo = \
  442. self.__init_hadoop_service('hdfs', ringClient)
  443. if hdfsStatus:
  444. self.__log.info("HDFS UI at http://%s" % self.hdfsInfo)
  445. mapredStatus, mapredAddr, self.mapredInfo = \
  446. self.__init_hadoop_service('mapred', ringClient)
  447. if mapredStatus:
  448. self.__log.info("Mapred UI at http://%s" % self.mapredInfo)
  449. if self.__cfg['hod'].has_key('update-worker-info') \
  450. and self.__cfg['hod']['update-worker-info']:
  451. workerInfoMap = {}
  452. workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
  453. workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
  454. if mapredAddr.find(':') != -1:
  455. workerInfoMap['Mapred RPC Port'] = mapredAddr.split(':')[1]
  456. ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
  457. if ret != 0:
  458. self.__log.warn('Could not update HDFS and Mapred information.' \
  459. 'User Portal may not show relevant information.' \
  460. 'Error code=%s' % ret)
  461. self.__cfg.replace_escape_seqs()
  462. # Go generate the client side hadoop-site.xml now
  463. # adding final-params as well, just so that conf on
  464. # client-side and server-side are (almost) the same
  465. clientParams = None
  466. serverParams = {}
  467. finalServerParams = {}
  468. # client-params
  469. if self.__cfg['hod'].has_key('client-params'):
  470. clientParams = self.__cfg['hod']['client-params']
  471. # server-params
  472. if self.__cfg['gridservice-mapred'].has_key('server-params'):
  473. serverParams.update(\
  474. self.__cfg['gridservice-mapred']['server-params'])
  475. if self.__cfg['gridservice-hdfs'].has_key('server-params'):
  476. # note that if there are params in both mapred and hdfs
  477. # sections, the ones in hdfs overwirte the ones in mapred
  478. serverParams.update(\
  479. self.__cfg['gridservice-hdfs']['server-params'])
  480. # final-server-params
  481. if self.__cfg['gridservice-mapred'].has_key(\
  482. 'final-server-params'):
  483. finalServerParams.update(\
  484. self.__cfg['gridservice-mapred']['final-server-params'])
  485. if self.__cfg['gridservice-hdfs'].has_key(
  486. 'final-server-params'):
  487. finalServerParams.update(\
  488. self.__cfg['gridservice-hdfs']['final-server-params'])
  489. clusterFactor = self.__cfg['hod']['cluster-factor']
  490. tempDir = self.__cfg['hod']['temp-dir']
  491. if not os.path.exists(tempDir):
  492. os.makedirs(tempDir)
  493. tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\
  494. + "." + self.jobId )
  495. mrSysDir = getMapredSystemDirectory(self.__cfg['hodring']['mapred-system-dir-root'],\
  496. self.__cfg['hod']['userid'], self.jobId)
  497. self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\
  498. hdfsAddr, mrSysDir, mapredAddr, clientParams,\
  499. serverParams, finalServerParams,\
  500. clusterFactor)
  501. self.__log.info("hadoop-site.xml at %s" % clusterDir)
  502. # end of hadoop-site.xml generation
  503. else:
  504. status = 8
  505. else:
  506. status = 7
  507. else:
  508. status = 6
  509. if status != 0:
  510. self.__log.debug("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)
  511. if ringClient is None:
  512. self.delete_job(self.jobId)
  513. else:
  514. self.__log.debug("Calling rm.stop()")
  515. ringClient.stopRM()
  516. self.__log.debug("Returning from rm.stop()")
  517. except HodInterruptException, h:
  518. self.__log.info(HOD_INTERRUPTED_MESG)
  519. if self.ringmasterXRS:
  520. if ringClient is None:
  521. ringClient = hodXRClient(self.ringmasterXRS)
  522. self.__log.debug("Calling rm.stop()")
  523. ringClient.stopRM()
  524. self.__log.debug("Returning from rm.stop()")
  525. self.__log.info("Cluster Shutdown by informing ringmaster.")
  526. else:
  527. self.delete_job(self.jobId)
  528. self.__log.info("Cluster %s removed from queue directly." % self.jobId)
  529. raise h
  530. else:
  531. self.__log.critical("No cluster found, ringmaster failed to run.")
  532. status = 5
  533. elif self.jobId == False:
  534. if exitCode == 188:
  535. self.__log.critical("Request execeeded maximum resource allocation.")
  536. else:
  537. self.__log.critical("Insufficient resources available.")
  538. status = 4
  539. else:
  540. self.__log.critical("Scheduler failure, allocation failed.\n\n")
  541. status = 4
  542. if status == 5 or status == 6:
  543. ringMasterErrors = self.__svcrgyClient.getRMError()
  544. if ringMasterErrors:
  545. self.__log.critical("Cluster could not be allocated because" \
  546. " of the following errors on the "\
  547. "ringmaster host %s.\n%s" % \
  548. (ringMasterErrors[0], ringMasterErrors[1]))
  549. self.__log.debug("Stack trace on ringmaster: %s" % ringMasterErrors[2])
  550. return status
  551. def __isRingMasterAlive(self, rmAddr):
  552. ret = True
  553. rmSocket = tcpSocket(rmAddr)
  554. try:
  555. rmSocket.open()
  556. rmSocket.close()
  557. except tcpError:
  558. ret = False
  559. return ret
  560. def deallocate(self, clusterDir, clusterInfo):
  561. status = 0
  562. nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'],
  563. id=clusterInfo['jobid'])
  564. self.mapredInfo = clusterInfo['mapred']
  565. self.hdfsInfo = clusterInfo['hdfs']
  566. try:
  567. if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
  568. clusterStatus = self.check_cluster(clusterInfo)
  569. if clusterStatus != 14 and clusterStatus != 10:
  570. # If JT is still alive
  571. self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
  572. else:
  573. self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
  574. except HodInterruptException, h:
  575. # got an interrupt. just pass and proceed to qdel
  576. pass
  577. except:
  578. self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
  579. rmAddr = None
  580. if clusterInfo.has_key('ring'):
  581. # format is http://host:port/ We need host:port
  582. rmAddr = clusterInfo['ring'][7:]
  583. if rmAddr.endswith('/'):
  584. rmAddr = rmAddr[:-1]
  585. if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
  586. # Cluster is already dead, don't try to contact ringmaster.
  587. self.__nodePool.finalize()
  588. status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
  589. else:
  590. xrsAddr = clusterInfo['ring']
  591. rmClient = hodXRClient(xrsAddr)
  592. self.__log.debug('calling rm.stop')
  593. rmClient.stopRM()
  594. self.__log.debug('completed rm.stop')
  595. # cleanup hod temp dirs
  596. tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \
  597. self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
  598. if os.path.exists(tempDir):
  599. shutil.rmtree(tempDir)
  600. return status
  601. class hadoopScript:
  602. def __init__(self, conf, execDir):
  603. self.__environ = os.environ.copy()
  604. self.__environ['HADOOP_CONF_DIR'] = conf
  605. self.__execDir = execDir
  606. def run(self, script):
  607. scriptThread = simpleCommand(script, script, self.__environ, 4, False,
  608. False, self.__execDir)
  609. scriptThread.start()
  610. scriptThread.wait()
  611. scriptThread.join()
  612. return scriptThread.exit_code()