hadoop.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define WorkLoad as abstract interface for user job"""
  15. # -*- python -*-
  16. import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, math
  17. from HTMLParser import HTMLParser
  18. import xml.dom.minidom
  19. import xml.dom.pulldom
  20. from xml.dom import getDOMImplementation
  21. from hodlib.Common.util import *
  22. from hodlib.Common.xmlrpc import hodXRClient
  23. from hodlib.Common.miniHTMLParser import miniHTMLParser
  24. from hodlib.Common.nodepoolutil import NodePoolUtil
  25. from hodlib.Common.tcp import tcpError, tcpSocket
  26. reCommandDelimeterString = r"(?<!\\);"
  27. reCommandDelimeter = re.compile(reCommandDelimeterString)
  28. class hadoopConfig:
  29. def __create_xml_element(self, doc, name, value, description, final = False):
  30. prop = doc.createElement("property")
  31. nameP = doc.createElement("name")
  32. string = doc.createTextNode(name)
  33. nameP.appendChild(string)
  34. valueP = doc.createElement("value")
  35. string = doc.createTextNode(value)
  36. valueP.appendChild(string)
  37. if final:
  38. finalP = doc.createElement("final")
  39. string = doc.createTextNode("true")
  40. finalP.appendChild(string)
  41. desc = doc.createElement("description")
  42. string = doc.createTextNode(description)
  43. desc.appendChild(string)
  44. prop.appendChild(nameP)
  45. prop.appendChild(valueP)
  46. if final:
  47. prop.appendChild(finalP)
  48. prop.appendChild(desc)
  49. return prop
  50. def gen_site_conf(self, confDir, numNodes, hdfsAddr, mapredAddr=None,\
  51. clientParams=None, serverParams=None,\
  52. finalServerParams=None, clusterFactor=None):
  53. if not mapredAddr:
  54. mapredAddr = "dummy:8181"
  55. implementation = getDOMImplementation()
  56. doc = implementation.createDocument('', 'configuration', None)
  57. comment = doc.createComment(
  58. "This is an auto generated hadoop-site.xml, do not modify")
  59. topElement = doc.documentElement
  60. topElement.appendChild(comment)
  61. prop = self.__create_xml_element(doc, 'mapred.job.tracker',
  62. mapredAddr, "description")
  63. topElement.appendChild(prop)
  64. prop = self.__create_xml_element(doc, 'fs.default.name', hdfsAddr,
  65. "description")
  66. topElement.appendChild(prop)
  67. mapredAddrSplit = mapredAddr.split(":")
  68. mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
  69. prop = self.__create_xml_element(doc, 'mapred.system.dir', mapredsystem,
  70. "description", True )
  71. topElement.appendChild(prop)
  72. prop = self.__create_xml_element(doc, 'hadoop.tmp.dir', confDir,
  73. "description")
  74. topElement.appendChild(prop)
  75. prop = self.__create_xml_element(doc, 'dfs.client.buffer.dir',
  76. confDir, "description")
  77. topElement.appendChild(prop)
  78. # clientParams aer enabled now
  79. if clientParams:
  80. for k, v in clientParams.iteritems():
  81. prop = self.__create_xml_element(doc, k, v[0], "client param")
  82. topElement.appendChild(prop)
  83. # end
  84. # servelParams
  85. if serverParams:
  86. for k, v in serverParams.iteritems():
  87. prop = self.__create_xml_element(doc, k, v[0], "server param")
  88. topElement.appendChild(prop)
  89. # finalservelParams
  90. if finalServerParams:
  91. for k, v in finalServerParams.iteritems():
  92. prop = self.__create_xml_element(doc, k, v[0], "server param", True)
  93. topElement.appendChild(prop)
  94. # mapred-default.xml is no longer used now.
  95. numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
  96. prop = self.__create_xml_element(doc, "mapred.reduce.tasks", str(numred),
  97. "description")
  98. topElement.appendChild(prop)
  99. # end
  100. siteName = os.path.join(confDir, "hadoop-site.xml")
  101. sitefile = file(siteName, 'w')
  102. print >> sitefile, topElement.toxml()
  103. sitefile.close()
  104. class hadoopCluster:
  105. def __init__(self, cfg, log):
  106. self.__cfg = cfg
  107. self.__log = log
  108. self.__changedClusterParams = []
  109. self.__hostname = local_fqdn()
  110. self.__svcrgyClient = None
  111. self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'],
  112. self.__cfg, self.__log)
  113. self.__hadoopCfg = hadoopConfig()
  114. self.jobId = None
  115. self.mapredInfo = None
  116. self.hdfsInfo = None
  117. self.ringmasterXRS = None
  118. def __get_svcrgy_client(self):
  119. svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])
  120. return hodXRClient(svcrgyUrl)
  121. def __get_service_status(self):
  122. serviceData = self.__get_service_data()
  123. status = True
  124. hdfs = False
  125. mapred = False
  126. for host in serviceData.keys():
  127. for item in serviceData[host]:
  128. service = item.keys()
  129. if service[0] == 'hdfs.grid' and \
  130. self.__cfg['gridservice-hdfs']['external'] == False:
  131. hdfs = True
  132. elif service[0] == 'mapred.grid':
  133. mapred = True
  134. if not mapred:
  135. status = "mapred"
  136. if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:
  137. if status != True:
  138. status = "mapred and hdfs"
  139. else:
  140. status = "hdfs"
  141. return status
  142. def __get_service_data(self):
  143. registry = to_http_url(self.__cfg['hod']['xrs-address'])
  144. serviceData = self.__svcrgyClient.getServiceInfo(
  145. self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())
  146. return serviceData
  147. def __check_allocation_manager(self):
  148. userValid = True
  149. try:
  150. self.serviceProxyClient = hodXRClient(
  151. to_http_url(self.__cfg['hod']['proxy-xrs-address']), None, None, 0,
  152. 0, 1, False, 15)
  153. userValid = self.serviceProxyClient.isProjectUserValid(
  154. self.__setup.cfg['hod']['userid'],
  155. self.__setup.cfg['resource_manager']['pbs-account'],True)
  156. if userValid:
  157. self.__log.debug("Validated that user %s is part of project %s." %
  158. (self.__cfg['hod']['userid'],
  159. self.__cfg['resource_manager']['pbs-account']))
  160. else:
  161. self.__log.debug("User %s is not part of project: %s." % (
  162. self.__cfg['hod']['userid'],
  163. self.__cfg['resource_manager']['pbs-account']))
  164. self.__log.error("Please specify a valid project in "
  165. + "resource_manager.pbs-account. If you still have "
  166. + "issues, please contact operations")
  167. userValidd = False
  168. # ignore invalid project for now - TODO
  169. except Exception:
  170. # ignore failures - non critical for now
  171. self.__log.debug(
  172. "Unable to contact Allocation Manager Proxy - ignoring...")
  173. #userValid = False
  174. return userValid
  175. def __check_job_status(self):
  176. initWaitCount = 20
  177. count = 0
  178. status = False
  179. state = 'Q'
  180. while state == 'Q':
  181. state = self.__nodePool.getJobState()
  182. if (state==False) or (state!='Q'):
  183. break
  184. count = count + 1
  185. if count < initWaitCount:
  186. time.sleep(0.5)
  187. else:
  188. time.sleep(10)
  189. if state and state != 'C':
  190. status = True
  191. return status
  192. def __get_ringmaster_client(self):
  193. ringmasterXRS = None
  194. ringList = self.__svcrgyClient.getServiceInfo(
  195. self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
  196. 'ringmaster', 'hod')
  197. if ringList and len(ringList):
  198. if isinstance(ringList, list):
  199. ringmasterXRS = ringList[0]['xrs']
  200. else:
  201. count = 0
  202. waitTime = self.__cfg['hod']['allocate-wait-time']
  203. while count < waitTime:
  204. ringList = self.__svcrgyClient.getServiceInfo(
  205. self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(),
  206. 'ringmaster',
  207. 'hod')
  208. if ringList and len(ringList):
  209. if isinstance(ringList, list):
  210. ringmasterXRS = ringList[0]['xrs']
  211. if ringmasterXRS is not None:
  212. break
  213. else:
  214. time.sleep(1)
  215. count = count + 1
  216. # check to see if the job exited by any chance in that time:
  217. if (count % 10 == 0):
  218. if not self.__check_job_status():
  219. break
  220. return ringmasterXRS
  221. def __init_hadoop_service(self, serviceName, xmlrpcClient):
  222. status = True
  223. serviceAddress = None
  224. serviceInfo = None
  225. for i in range(0, 250):
  226. try:
  227. serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
  228. if serviceAddress:
  229. if serviceAddress == 'not found':
  230. time.sleep(.5)
  231. # check to see if the job exited by any chance in that time:
  232. if (i % 10 == 0):
  233. if not self.__check_job_status():
  234. break
  235. else:
  236. serviceInfo = xmlrpcClient.getURLs(serviceName)
  237. break
  238. except:
  239. self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
  240. self.__log.debug(get_exception_string())
  241. status = False
  242. break
  243. if serviceAddress == 'not found' or not serviceAddress:
  244. self.__log.critical("Failed to retrieve '%s' service address." %
  245. serviceName)
  246. status = False
  247. else:
  248. try:
  249. self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'],
  250. self.jobId, self.__hostname,
  251. serviceName, 'grid', serviceInfo)
  252. except:
  253. self.__log.critical("'%s': registry xmlrpc error." % serviceName)
  254. self.__log.debug(get_exception_string())
  255. status = False
  256. return status, serviceAddress, serviceInfo
  257. def __collect_jobtracker_ui(self, dir):
  258. link = self.mapredInfo + "/jobtracker.jsp"
  259. parser = miniHTMLParser()
  260. parser.setBaseUrl(self.mapredInfo)
  261. node_cache = {}
  262. self.__log.debug("collect_jobtracker_ui seeded with " + link)
  263. def alarm_handler(number, stack):
  264. raise AlarmException("timeout")
  265. signal.signal(signal.SIGALRM, alarm_handler)
  266. input = None
  267. while link:
  268. self.__log.debug("link: %s" % link)
  269. # taskstats.jsp,taskdetails.jsp not included since too many to collect
  270. if re.search(
  271. "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp",
  272. link):
  273. for i in range(1,5):
  274. try:
  275. input = urllib.urlopen(link)
  276. break
  277. except:
  278. self.__log.debug(get_exception_string())
  279. time.sleep(1)
  280. if input:
  281. out = None
  282. self.__log.debug("collecting " + link + "...")
  283. filename = re.sub(self.mapredInfo, "", link)
  284. filename = dir + "/" + filename
  285. filename = re.sub("http://","", filename)
  286. filename = re.sub("[\?\&=:]","_",filename)
  287. filename = filename + ".html"
  288. try:
  289. tempdir, tail = os.path.split(filename)
  290. if not os.path.exists(tempdir):
  291. os.makedirs(tempdir)
  292. except:
  293. self.__log.debug(get_exception_string())
  294. out = open(filename, 'w')
  295. bufSz = 8192
  296. signal.alarm(10)
  297. try:
  298. self.__log.debug("Starting to grab: %s" % link)
  299. buf = input.read(bufSz)
  300. while len(buf) > 0:
  301. # Feed the file into the HTML parser
  302. parser.feed(buf)
  303. # Re-write the hrefs in the file
  304. p = re.compile("\?(.+?)=(.+?)")
  305. buf = p.sub(r"_\1_\2",buf)
  306. p= re.compile("&(.+?)=(.+?)")
  307. buf = p.sub(r"_\1_\2",buf)
  308. p = re.compile("http://(.+?):(\d+)?")
  309. buf = p.sub(r"\1_\2/",buf)
  310. buf = re.sub("href=\"/","href=\"",buf)
  311. p = re.compile("href=\"(.+?)\"")
  312. buf = p.sub(r"href=\1.html",buf)
  313. out.write(buf)
  314. buf = input.read(bufSz)
  315. signal.alarm(0)
  316. input.close()
  317. if out:
  318. out.close()
  319. self.__log.debug("Finished grabbing: %s" % link)
  320. except AlarmException:
  321. if out: out.close()
  322. if input: input.close()
  323. self.__log.debug("Failed to retrieve: %s" % link)
  324. else:
  325. self.__log.debug("Failed to retrieve: %s" % link)
  326. # Get the next link in level traversal order
  327. link = parser.getNextLink()
  328. parser.close()
  329. def check_cluster(self, clusterInfo):
  330. status = 0
  331. if 'mapred' in clusterInfo:
  332. mapredAddress = clusterInfo['mapred'][7:]
  333. hdfsAddress = clusterInfo['hdfs'][7:]
  334. mapredSocket = tcpSocket(mapredAddress)
  335. try:
  336. mapredSocket.open()
  337. mapredSocket.close()
  338. except tcpError:
  339. status = 14
  340. hdfsSocket = tcpSocket(hdfsAddress)
  341. try:
  342. hdfsSocket.open()
  343. hdfsSocket.close()
  344. except tcpError:
  345. if status > 0:
  346. status = 10
  347. else:
  348. status = 13
  349. if status == 0:
  350. status = 12
  351. else:
  352. status = 15
  353. return status
  354. def cleanup(self):
  355. if self.__nodePool: self.__nodePool.finalize()
  356. def get_job_id(self):
  357. return self.jobId
  358. def delete_job(self, jobId):
  359. '''Delete a job given it's ID'''
  360. ret = 0
  361. if self.__nodePool:
  362. ret = self.__nodePool.deleteJob(jobId)
  363. else:
  364. raise Exception("Invalid state: Node pool is not initialized to delete the given job.")
  365. return ret
  366. def allocate(self, clusterDir, min, max=None):
  367. status = 0
  368. self.__svcrgyClient = self.__get_svcrgy_client()
  369. self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
  370. if min < 3:
  371. self.__log.critical("Minimum nodes must be greater than 2.")
  372. status = 2
  373. else:
  374. if self.__check_allocation_manager():
  375. nodeSet = self.__nodePool.newNodeSet(min)
  376. self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet)
  377. if self.jobId:
  378. if self.__check_job_status():
  379. self.ringmasterXRS = self.__get_ringmaster_client()
  380. if self.ringmasterXRS:
  381. ringClient = hodXRClient(self.ringmasterXRS)
  382. hdfsStatus, hdfsAddr, self.hdfsInfo = \
  383. self.__init_hadoop_service('hdfs', ringClient)
  384. if hdfsStatus:
  385. mapredStatus, mapredAddr, self.mapredInfo = \
  386. self.__init_hadoop_service('mapred', ringClient)
  387. if mapredStatus:
  388. self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
  389. self.__log.info("Mapred UI on http://%s" % self.mapredInfo)
  390. # Go generate the client side hadoop-site.xml now
  391. # adding final-params as well, just so that conf on
  392. # client-side and server-side are (almost) the same
  393. clientParams = None
  394. serverParams = {}
  395. finalServerParams = {}
  396. # client-params
  397. if self.__cfg['hod'].has_key('client-params'):
  398. clientParams = self.__cfg['hod']['client-params']
  399. # server-params
  400. if self.__cfg['gridservice-mapred'].has_key('server-params'):
  401. serverParams.update(\
  402. self.__cfg['gridservice-mapred']['server-params'])
  403. if self.__cfg['gridservice-hdfs'].has_key('server-params'):
  404. # note that if there are params in both mapred and hdfs
  405. # sections, the ones in hdfs overwirte the ones in mapred
  406. serverParams.update(\
  407. self.__cfg['gridservice-mapred']['server-params'])
  408. # final-server-params
  409. if self.__cfg['gridservice-mapred'].has_key(\
  410. 'final-server-params'):
  411. finalServerParams.update(\
  412. self.__cfg['gridservice-mapred']['final-server-params'])
  413. if self.__cfg['gridservice-hdfs'].has_key(
  414. 'final-server-params'):
  415. finalServerParams.update(\
  416. self.__cfg['gridservice-hdfs']['final-server-params'])
  417. clusterFactor = self.__cfg['hod']['cluster-factor']
  418. self.__hadoopCfg.gen_site_conf(clusterDir, min,
  419. hdfsAddr, mapredAddr, clientParams,\
  420. serverParams, finalServerParams,\
  421. clusterFactor)
  422. # end of hadoop-site.xml generation
  423. else:
  424. status = 8
  425. else:
  426. status = 7
  427. else:
  428. status = 6
  429. if status != 0:
  430. self.__log.info("Cleaning up job id %s, as cluster could not be allocated." % self.jobId)
  431. self.delete_job(self.jobId)
  432. else:
  433. self.__log.critical("No job found, ringmaster failed to run.")
  434. status = 5
  435. elif self.jobId == False:
  436. if exitCode == 188:
  437. self.__log.critical("Request execeeded maximum resource allocation.")
  438. else:
  439. self.__log.critical("Insufficient resources available.")
  440. status = 4
  441. else:
  442. self.__log.critical("Scheduler failure, allocation failed.\n\n")
  443. status = 4
  444. else:
  445. status = 9
  446. return status
  447. def deallocate(self, clusterDir, clusterInfo):
  448. status = 0
  449. nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'],
  450. id=clusterInfo['jobid'])
  451. self.mapredInfo = clusterInfo['mapred']
  452. self.hdfsInfo = clusterInfo['hdfs']
  453. try:
  454. if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
  455. clusterStatus = self.check_cluster(clusterInfo)
  456. if clusterStatus != 14 and clusterStatus != 10:
  457. # If JT is still alive
  458. self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
  459. else:
  460. self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
  461. except:
  462. self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
  463. status = self.__nodePool.finalize()
  464. return status
  465. class hadoopScript:
  466. def __init__(self, conf, execDir):
  467. self.__environ = os.environ.copy()
  468. self.__environ['HADOOP_CONF_DIR'] = conf
  469. self.__execDir = execDir
  470. def run(self, script):
  471. scriptThread = simpleCommand(script, script, self.__environ, 4, False,
  472. False, self.__execDir)
  473. scriptThread.start()
  474. scriptThread.wait()
  475. scriptThread.join()
  476. return scriptThread.exit_code()