hadoop.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define WorkLoad as abstract interface for user job"""
  15. # -*- python -*-
  16. import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, math
  17. from HTMLParser import HTMLParser
  18. import xml.dom.minidom
  19. import xml.dom.pulldom
  20. from xml.dom import getDOMImplementation
  21. from hodlib.Common.util import *
  22. from hodlib.Common.xmlrpc import hodXRClient
  23. from hodlib.Common.miniHTMLParser import miniHTMLParser
  24. from hodlib.Common.nodepoolutil import NodePoolUtil
  25. from hodlib.Common.tcp import tcpError, tcpSocket
  26. reCommandDelimeterString = r"(?<!\\);"
  27. reCommandDelimeter = re.compile(reCommandDelimeterString)
  28. class hadoopConfig:
  29. def __create_xml_element(self, doc, name, value, description, final = False):
  30. prop = doc.createElement("property")
  31. nameP = doc.createElement("name")
  32. string = doc.createTextNode(name)
  33. nameP.appendChild(string)
  34. valueP = doc.createElement("value")
  35. string = doc.createTextNode(value)
  36. valueP.appendChild(string)
  37. if final:
  38. finalP = doc.createElement("final")
  39. string = doc.createTextNode("true")
  40. finalP.appendChild(string)
  41. desc = doc.createElement("description")
  42. string = doc.createTextNode(description)
  43. desc.appendChild(string)
  44. prop.appendChild(nameP)
  45. prop.appendChild(valueP)
  46. if final:
  47. prop.appendChild(finalP)
  48. prop.appendChild(desc)
  49. return prop
  50. def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr, mrSysDir,\
  51. mapredAddr=None, clientParams=None, serverParams=None,\
  52. finalServerParams=None, clusterFactor=None):
  53. if not mapredAddr:
  54. mapredAddr = "dummy:8181"
  55. implementation = getDOMImplementation()
  56. doc = implementation.createDocument('', 'configuration', None)
  57. comment = doc.createComment(
  58. "This is an auto generated hadoop-site.xml, do not modify")
  59. topElement = doc.documentElement
  60. topElement.appendChild(comment)
  61. description = {}
  62. paramsDict = { 'mapred.job.tracker' : mapredAddr , \
  63. 'fs.default.name' : "hdfs://" + hdfsAddr, \
  64. 'hadoop.tmp.dir' : tempDir, \
  65. 'dfs.client.buffer.dir' : os.path.join(tempDir, 'dfs',
  66. 'tmp'),
  67. }
  68. paramsDict['mapred.system.dir'] = mrSysDir
  69. # mapred-default.xml is no longer used now.
  70. numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
  71. paramsDict['mapred.reduce.tasks'] = str(numred)
  72. # end
  73. # for all the above vars generated, set the description
  74. for k, v in paramsDict.iteritems():
  75. description[k] = 'Hod generated parameter'
  76. # finalservelParams
  77. if finalServerParams:
  78. for k, v in finalServerParams.iteritems():
  79. if not description.has_key(k):
  80. description[k] = "final server parameter"
  81. paramsDict[k] = v
  82. # servelParams
  83. if serverParams:
  84. for k, v in serverParams.iteritems():
  85. if not description.has_key(k):
  86. # if no final value for same param is mentioned
  87. description[k] = "server parameter"
  88. paramsDict[k] = v
  89. # clientParams
  90. if clientParams:
  91. for k, v in clientParams.iteritems():
  92. if not description.has_key(k) or description[k] == "server parameter":
  93. # Just add, if no final value for same param is mentioned.
  94. # Replace even if server param is mentioned for same config variable
  95. description[k] = "client-side parameter"
  96. paramsDict[k] = v
  97. # generate the xml elements
  98. for k,v in paramsDict.iteritems():
  99. if ( description[k] == "final server parameter" or \
  100. description[k] == "Hod generated parameter" ):
  101. final = True
  102. else: final = False
  103. prop = self.__create_xml_element(doc, k, v, description[k], final)
  104. topElement.appendChild(prop)
  105. siteName = os.path.join(confDir, "hadoop-site.xml")
  106. sitefile = file(siteName, 'w')
  107. print >> sitefile, topElement.toxml()
  108. sitefile.close()
  109. class hadoopCluster:
  110. def __init__(self, cfg, log):
  111. self.__cfg = cfg
  112. self.__log = log
  113. self.__changedClusterParams = []
  114. self.__hostname = local_fqdn()
  115. self.__svcrgyClient = None
  116. self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'],
  117. self.__cfg, self.__log)
  118. self.__hadoopCfg = hadoopConfig()
  119. self.jobId = None
  120. self.mapredInfo = None
  121. self.hdfsInfo = None
  122. self.ringmasterXRS = None
  123. def __get_svcrgy_client(self):
  124. svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])
  125. return hodXRClient(svcrgyUrl)
  126. def __get_service_status(self):
  127. serviceData = self.__get_service_data()
  128. status = True
  129. hdfs = False
  130. mapred = False
  131. for host in serviceData.keys():
  132. for item in serviceData[host]:
  133. service = item.keys()
  134. if service[0] == 'hdfs.grid' and \
  135. self.__cfg['gridservice-hdfs']['external'] == False:
  136. hdfs = True
  137. elif service[0] == 'mapred.grid':
  138. mapred = True
  139. if not mapred:
  140. status = "mapred"
  141. if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:
  142. if status != True:
  143. status = "mapred and hdfs"
  144. else:
  145. status = "hdfs"
  146. return status
  147. def __get_service_data(self):
  148. registry = to_http_url(self.__cfg['hod']['xrs-address'])
  149. serviceData = self.__svcrgyClient.getServiceInfo(
  150. self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())
  151. return serviceData
  152. def __check_job_status(self):
  153. failureCount = 0
  154. status = False
  155. state = 'Q'
  156. while (state=='Q') or (state==False):
  157. if hodInterrupt.isSet():
  158. raise HodInterruptException()
  159. state = self.__nodePool.getJobState()
  160. self.__log.debug('job state %s' % state)
  161. if state == False:
  162. failureCount += 1
  163. if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
  164. self.__log.debug('Number of retries reached max limit while querying job status')
  165. break
  166. time.sleep(self.__cfg['hod']['job-command-failure-interval'])
  167. elif state!='Q':
  168. break
  169. else:
  170. self.__log.debug('querying for job status after job-status-query-interval')
  171. time.sleep(self.__cfg['hod']['job-status-query-interval'])
  172. if state and state != 'C':
  173. status = True
  174. return status
  175. def __get_ringmaster_client(self):
  176. ringmasterXRS = None
  177. ringList = self.__svcrgyClient.getServiceInfo(
  178. self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
  179. 'ringmaster', 'hod')
  180. if ringList and len(ringList):
  181. if isinstance(ringList, list):
  182. ringmasterXRS = ringList[0]['xrs']
  183. else:
  184. count = 0
  185. waitTime = self.__cfg['hod']['allocate-wait-time']
  186. while count < waitTime:
  187. if hodInterrupt.isSet():
  188. raise HodInterruptException()
  189. ringList = self.__svcrgyClient.getServiceInfo(
  190. self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
  191. 'ringmaster',
  192. 'hod')
  193. if ringList and len(ringList):
  194. if isinstance(ringList, list):
  195. ringmasterXRS = ringList[0]['xrs']
  196. if ringmasterXRS is not None:
  197. break
  198. else:
  199. time.sleep(1)
  200. count = count + 1
  201. # check to see if the job exited by any chance in that time:
  202. if (count % self.__cfg['hod']['job-status-query-interval'] == 0):
  203. if not self.__check_job_status():
  204. break
  205. return ringmasterXRS
  206. def __init_hadoop_service(self, serviceName, xmlrpcClient):
  207. status = True
  208. serviceAddress = None
  209. serviceInfo = None
  210. for i in range(0, 250):
  211. try:
  212. if hodInterrupt.isSet():
  213. raise HodInterruptException()
  214. serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
  215. if serviceAddress:
  216. if serviceAddress == 'not found':
  217. time.sleep(1)
  218. # check to see if the job exited by any chance in that time:
  219. if ((i+1) % self.__cfg['hod']['job-status-query-interval'] == 0):
  220. if not self.__check_job_status():
  221. break
  222. else:
  223. serviceInfo = xmlrpcClient.getURLs(serviceName)
  224. break
  225. except HodInterruptException,h :
  226. raise h
  227. except:
  228. self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
  229. self.__log.debug(get_exception_string())
  230. status = False
  231. break
  232. if serviceAddress == 'not found' or not serviceAddress:
  233. self.__log.critical("Failed to retrieve '%s' service address." %
  234. serviceName)
  235. status = False
  236. else:
  237. try:
  238. self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'],
  239. self.jobId, self.__hostname,
  240. serviceName, 'grid', serviceInfo)
  241. except HodInterruptException, h:
  242. raise h
  243. except:
  244. self.__log.critical("'%s': registry xmlrpc error." % serviceName)
  245. self.__log.debug(get_exception_string())
  246. status = False
  247. return status, serviceAddress, serviceInfo
  248. def __collect_jobtracker_ui(self, dir):
  249. link = self.mapredInfo + "/jobtracker.jsp"
  250. parser = miniHTMLParser()
  251. parser.setBaseUrl(self.mapredInfo)
  252. node_cache = {}
  253. self.__log.debug("collect_jobtracker_ui seeded with " + link)
  254. def alarm_handler(number, stack):
  255. raise AlarmException("timeout")
  256. signal.signal(signal.SIGALRM, alarm_handler)
  257. input = None
  258. while link:
  259. self.__log.debug("link: %s" % link)
  260. # taskstats.jsp,taskdetails.jsp not included since too many to collect
  261. if re.search(
  262. "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp",
  263. link):
  264. for i in range(1,5):
  265. if hodInterrupt.isSet():
  266. raise HodInterruptException()
  267. try:
  268. input = urllib.urlopen(link)
  269. break
  270. except:
  271. self.__log.debug(get_exception_string())
  272. time.sleep(1)
  273. if input:
  274. out = None
  275. self.__log.debug("collecting " + link + "...")
  276. filename = re.sub(self.mapredInfo, "", link)
  277. filename = dir + "/" + filename
  278. filename = re.sub("http://","", filename)
  279. filename = re.sub("[\?\&=:]","_",filename)
  280. filename = filename + ".html"
  281. try:
  282. tempdir, tail = os.path.split(filename)
  283. if not os.path.exists(tempdir):
  284. os.makedirs(tempdir)
  285. except:
  286. self.__log.debug(get_exception_string())
  287. out = open(filename, 'w')
  288. bufSz = 8192
  289. signal.alarm(10)
  290. try:
  291. self.__log.debug("Starting to grab: %s" % link)
  292. buf = input.read(bufSz)
  293. while len(buf) > 0:
  294. # Feed the file into the HTML parser
  295. parser.feed(buf)
  296. # Re-write the hrefs in the file
  297. p = re.compile("\?(.+?)=(.+?)")
  298. buf = p.sub(r"_\1_\2",buf)
  299. p= re.compile("&(.+?)=(.+?)")
  300. buf = p.sub(r"_\1_\2",buf)
  301. p = re.compile("http://(.+?):(\d+)?")
  302. buf = p.sub(r"\1_\2/",buf)
  303. buf = re.sub("href=\"/","href=\"",buf)
  304. p = re.compile("href=\"(.+?)\"")
  305. buf = p.sub(r"href=\1.html",buf)
  306. out.write(buf)
  307. buf = input.read(bufSz)
  308. signal.alarm(0)
  309. input.close()
  310. if out:
  311. out.close()
  312. self.__log.debug("Finished grabbing: %s" % link)
  313. except AlarmException:
  314. if hodInterrupt.isSet():
  315. raise HodInterruptException()
  316. if out: out.close()
  317. if input: input.close()
  318. self.__log.debug("Failed to retrieve: %s" % link)
  319. else:
  320. self.__log.debug("Failed to retrieve: %s" % link)
  321. # Get the next link in level traversal order
  322. link = parser.getNextLink()
  323. parser.close()
  324. def check_cluster(self, clusterInfo):
  325. status = 0
  326. if 'mapred' in clusterInfo:
  327. mapredAddress = clusterInfo['mapred'][7:]
  328. hdfsAddress = clusterInfo['hdfs'][7:]
  329. status = get_cluster_status(hdfsAddress, mapredAddress)
  330. if status == 0:
  331. status = 12
  332. else:
  333. status = 15
  334. return status
  335. def cleanup(self):
  336. if self.__nodePool: self.__nodePool.finalize()
  337. def get_job_id(self):
  338. return self.jobId
  339. def delete_job(self, jobId):
  340. '''Delete a job given it's ID'''
  341. ret = 0
  342. if self.__nodePool:
  343. ret = self.__nodePool.deleteJob(jobId)
  344. else:
  345. raise Exception("Invalid state: Node pool is not initialized to delete the given job.")
  346. return ret
  347. def allocate(self, clusterDir, min, max=None):
  348. status = 0
  349. failureCount = 0
  350. self.__svcrgyClient = self.__get_svcrgy_client()
  351. self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
  352. if min < 3:
  353. self.__log.critical("Minimum nodes must be greater than 2.")
  354. status = 2
  355. else:
  356. nodeSet = self.__nodePool.newNodeSet(min)
  357. walltime = None
  358. if self.__cfg['hod'].has_key('walltime'):
  359. walltime = self.__cfg['hod']['walltime']
  360. self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
  361. # if the job submission returned an error other than no resources
  362. # retry a couple of times
  363. while (self.jobId is False) and (exitCode != 188):
  364. if hodInterrupt.isSet():
  365. raise HodInterruptException()
  366. failureCount += 1
  367. if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
  368. self.__log.debug("failed submitting job more than the retries. exiting")
  369. break
  370. else:
  371. # wait a bit before retrying
  372. time.sleep(self.__cfg['hod']['job-command-failure-interval'])
  373. if hodInterrupt.isSet():
  374. raise HodInterruptException()
  375. self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
  376. if self.jobId:
  377. try:
  378. jobStatus = self.__check_job_status()
  379. except HodInterruptException, h:
  380. self.__log.info(HOD_INTERRUPTED_MESG)
  381. self.delete_job(self.jobId)
  382. self.__log.info("Job %s removed from queue." % self.jobId)
  383. raise h
  384. if jobStatus:
  385. self.__log.info("Cluster Id %s" \
  386. % self.jobId)
  387. try:
  388. self.ringmasterXRS = self.__get_ringmaster_client()
  389. self.__log.debug("Ringmaster at : %s" % self.ringmasterXRS )
  390. ringClient = None
  391. if self.ringmasterXRS:
  392. ringClient = hodXRClient(self.ringmasterXRS)
  393. hdfsStatus, hdfsAddr, self.hdfsInfo = \
  394. self.__init_hadoop_service('hdfs', ringClient)
  395. if hdfsStatus:
  396. self.__log.info("HDFS UI at http://%s" % self.hdfsInfo)
  397. mapredStatus, mapredAddr, self.mapredInfo = \
  398. self.__init_hadoop_service('mapred', ringClient)
  399. if mapredStatus:
  400. self.__log.info("Mapred UI at http://%s" % self.mapredInfo)
  401. if self.__cfg['hod'].has_key('update-worker-info') \
  402. and self.__cfg['hod']['update-worker-info']:
  403. workerInfoMap = {}
  404. workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
  405. workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
  406. ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
  407. if ret != 0:
  408. self.__log.warn('Could not update HDFS and Mapred information.' \
  409. 'User Portal may not show relevant information.' \
  410. 'Error code=%s' % ret)
  411. self.__cfg.replace_escape_seqs()
  412. # Go generate the client side hadoop-site.xml now
  413. # adding final-params as well, just so that conf on
  414. # client-side and server-side are (almost) the same
  415. clientParams = None
  416. serverParams = {}
  417. finalServerParams = {}
  418. # client-params
  419. if self.__cfg['hod'].has_key('client-params'):
  420. clientParams = self.__cfg['hod']['client-params']
  421. # server-params
  422. if self.__cfg['gridservice-mapred'].has_key('server-params'):
  423. serverParams.update(\
  424. self.__cfg['gridservice-mapred']['server-params'])
  425. if self.__cfg['gridservice-hdfs'].has_key('server-params'):
  426. # note that if there are params in both mapred and hdfs
  427. # sections, the ones in hdfs overwirte the ones in mapred
  428. serverParams.update(\
  429. self.__cfg['gridservice-hdfs']['server-params'])
  430. # final-server-params
  431. if self.__cfg['gridservice-mapred'].has_key(\
  432. 'final-server-params'):
  433. finalServerParams.update(\
  434. self.__cfg['gridservice-mapred']['final-server-params'])
  435. if self.__cfg['gridservice-hdfs'].has_key(
  436. 'final-server-params'):
  437. finalServerParams.update(\
  438. self.__cfg['gridservice-hdfs']['final-server-params'])
  439. clusterFactor = self.__cfg['hod']['cluster-factor']
  440. tempDir = self.__cfg['hod']['temp-dir']
  441. if not os.path.exists(tempDir):
  442. os.makedirs(tempDir)
  443. tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\
  444. + "." + self.jobId )
  445. mrSysDir = getMapredSystemDirectory(self.__cfg['hodring']['mapred-system-dir-root'],\
  446. self.__cfg['hod']['userid'], self.jobId)
  447. self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\
  448. hdfsAddr, mrSysDir, mapredAddr, clientParams,\
  449. serverParams, finalServerParams,\
  450. clusterFactor)
  451. self.__log.info("hadoop-site.xml at %s" % clusterDir)
  452. # end of hadoop-site.xml generation
  453. else:
  454. status = 8
  455. else:
  456. status = 7
  457. else:
  458. status = 6
  459. if status != 0:
  460. self.__log.info("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)
  461. if ringClient is None:
  462. self.delete_job(self.jobId)
  463. else:
  464. self.__log.debug("Calling rm.stop()")
  465. ringClient.stopRM()
  466. self.__log.debug("Returning from rm.stop()")
  467. except HodInterruptException, h:
  468. self.__log.info(HOD_INTERRUPTED_MESG)
  469. if self.ringmasterXRS:
  470. if ringClient is None:
  471. ringClient = hodXRClient(self.ringmasterXRS)
  472. self.__log.debug("Calling rm.stop()")
  473. ringClient.stopRM()
  474. self.__log.debug("Returning from rm.stop()")
  475. self.__log.info("Job Shutdown by informing ringmaster.")
  476. else:
  477. self.delete_job(self.jobId)
  478. self.__log.info("Job %s removed from queue directly." % self.jobId)
  479. raise h
  480. else:
  481. self.__log.critical("No job found, ringmaster failed to run.")
  482. status = 5
  483. elif self.jobId == False:
  484. if exitCode == 188:
  485. self.__log.critical("Request execeeded maximum resource allocation.")
  486. else:
  487. self.__log.critical("Job submission failed with exit code %s" % exitCode)
  488. status = 4
  489. else:
  490. self.__log.critical("Scheduler failure, allocation failed.\n\n")
  491. status = 4
  492. return status
  493. def __isRingMasterAlive(self, rmAddr):
  494. ret = True
  495. rmSocket = tcpSocket(rmAddr)
  496. try:
  497. rmSocket.open()
  498. rmSocket.close()
  499. except tcpError:
  500. ret = False
  501. return ret
  502. def deallocate(self, clusterDir, clusterInfo):
  503. status = 0
  504. nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'],
  505. id=clusterInfo['jobid'])
  506. self.mapredInfo = clusterInfo['mapred']
  507. self.hdfsInfo = clusterInfo['hdfs']
  508. try:
  509. if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
  510. clusterStatus = self.check_cluster(clusterInfo)
  511. if clusterStatus != 14 and clusterStatus != 10:
  512. # If JT is still alive
  513. self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
  514. else:
  515. self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
  516. except HodInterruptException, h:
  517. # got an interrupt. just pass and proceed to qdel
  518. pass
  519. except:
  520. self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
  521. rmAddr = None
  522. if clusterInfo.has_key('ring'):
  523. # format is http://host:port/ We need host:port
  524. rmAddr = clusterInfo['ring'][7:]
  525. if rmAddr.endswith('/'):
  526. rmAddr = rmAddr[:-1]
  527. if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
  528. # Cluster is already dead, don't try to contact ringmaster.
  529. self.__nodePool.finalize()
  530. status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
  531. else:
  532. xrsAddr = clusterInfo['ring']
  533. rmClient = hodXRClient(xrsAddr)
  534. self.__log.debug('calling rm.stop')
  535. rmClient.stopRM()
  536. self.__log.debug('completed rm.stop')
  537. # cleanup hod temp dirs
  538. tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \
  539. self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
  540. if os.path.exists(tempDir):
  541. shutil.rmtree(tempDir)
  542. return status
  543. class hadoopScript:
  544. def __init__(self, conf, execDir):
  545. self.__environ = os.environ.copy()
  546. self.__environ['HADOOP_CONF_DIR'] = conf
  547. self.__execDir = execDir
  548. def run(self, script):
  549. scriptThread = simpleCommand(script, script, self.__environ, 4, False,
  550. False, self.__execDir)
  551. scriptThread.start()
  552. scriptThread.wait()
  553. scriptThread.join()
  554. return scriptThread.exit_code()