torque.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """Maui/Torque implementation of NodePool"""
  15. # -*- python -*-
  16. import os, sys, csv, socket, time, re, pprint
  17. from hodlib.Hod.nodePool import *
  18. from hodlib.Schedulers.torque import torqueInterface
  19. from hodlib.Common.threads import simpleCommand
  20. from hodlib.Common.util import get_exception_string, args_to_string, local_fqdn
  21. class TorqueNodeSet(NodeSet):
  22. def __init__(self, id, numNodes, preferredList, isPreemptee):
  23. NodeSet.__init__(self, id, numNodes, preferredList, isPreemptee)
  24. self.qsubId = None
  25. self.addrList = []
  26. def _setQsubId(self, qsubId):
  27. self.qsubId = qsubId
  28. def _setAddrList(self, addrList):
  29. self.addrList = addrList
  30. def getAddrList(self):
  31. return self.addrList
  32. class TorquePool(NodePool):
  33. def __init__(self, nodePoolDesc, cfg, log):
  34. NodePool.__init__(self, nodePoolDesc, cfg, log)
  35. environ = os.environ.copy()
  36. if self._cfg['resource_manager'].has_key('pbs-server'):
  37. environ['PBS_DEFAULT'] = self._cfg['resource_manager']['pbs-server']
  38. self.__torque = torqueInterface(
  39. self._cfg['resource_manager']['batch-home'], environ, self._log)
  40. def __gen_submit_params(self, nodeSet, walltime = None, qosLevel = None,
  41. account = None):
  42. argList = []
  43. stdinList = []
  44. npd = self.nodePoolDesc
  45. def gen_stdin_list():
  46. # Here we are basically generating the standard input for qsub.
  47. # Specifically a script to exec ringmaster.
  48. stdinList.append('#!/bin/sh')
  49. ringBin = os.path.join(self._cfg['hod']['base-dir'], 'bin',
  50. 'ringmaster')
  51. ringArgs = [ringBin,]
  52. ringArgs.extend(self._cfg.get_args(exclude=('hod')))
  53. ringMasterCommand = args_to_string(ringArgs)
  54. self._log.debug("ringmaster cmd: %s" % ringMasterCommand)
  55. stdinList.append(ringMasterCommand)
  56. def gen_arg_list():
  57. def process_qsub_attributes():
  58. rawAttributes = self.nodePoolDesc.getAttrs()
  59. # 'W:x' is used to specify torque management extentensions ie -W x= ...
  60. resourceManagementExtensions = ''
  61. if 'W:x' in rawAttributes:
  62. resourceManagementExtensions = rawAttributes['W:x']
  63. if qosLevel:
  64. if len(resourceManagementExtensions) > 0:
  65. resourceManagementExtensions += ';'
  66. resourceManagementExtensions += 'QOS:%s' % (qosLevel)
  67. rawAttributes['W:x'] = resourceManagementExtensions
  68. hostname = local_fqdn()
  69. rawAttributes['l:nodes'] = nodeSet._getNumNodes()
  70. if walltime:
  71. rawAttributes['l:walltime'] = walltime
  72. #create a dict of dictionaries for
  73. # various arguments of torque
  74. cmds = {}
  75. for key in rawAttributes:
  76. value = rawAttributes[key]
  77. if key.find(':') == -1:
  78. raise ValueError, 'Syntax error: missing colon after %s in %s=%s' % (
  79. key, key, value)
  80. [option, subOption] = key.split(':', 1)
  81. if not option in cmds:
  82. cmds[option] = {}
  83. cmds[option][subOption] = value
  84. opts = []
  85. #create a string from this
  86. #dictionary of dictionaries createde above
  87. for k in cmds:
  88. csv = []
  89. nv = cmds[k]
  90. for n in nv:
  91. v = nv[n]
  92. if len(n) == 0:
  93. csv.append(v)
  94. else:
  95. csv.append('%s=%s' % (n, v))
  96. opts.append('-%s' % (k))
  97. opts.append(','.join(csv))
  98. for option in cmds:
  99. commandList = []
  100. for subOption in cmds[option]:
  101. value = cmds[option][subOption]
  102. if len(subOption) == 0:
  103. commandList.append(value)
  104. else:
  105. commandList.append("%s=%s" % (subOption, value))
  106. opts.append('-%s' % option)
  107. opts.append(','.join(commandList))
  108. return opts
  109. pkgdir = npd.getPkgDir()
  110. qsub = os.path.join(pkgdir, 'bin', 'qsub')
  111. sdd = self._cfg['servicedesc']
  112. gsvc = None
  113. for key in sdd:
  114. gsvc = sdd[key]
  115. break
  116. argList.extend(process_qsub_attributes())
  117. argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))
  118. argList.extend(('-r','n'))
  119. if 'pbs-user' in self._cfg['resource_manager']:
  120. argList.extend(('-u', self._cfg['resource_manager']['pbs-user']))
  121. argList.extend(('-d','/tmp/'))
  122. if 'queue' in self._cfg['resource_manager']:
  123. queue = self._cfg['resource_manager']['queue']
  124. argList.extend(('-q',queue))
  125. # In HOD 0.4, we pass in an account string only if it is mentioned.
  126. # Also, we don't append userid to the account string, as HOD jobs run as the
  127. # user running them, not as 'HOD' user.
  128. if self._cfg['resource_manager'].has_key('pbs-account'):
  129. argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))
  130. if 'env-vars' in self._cfg['resource_manager']:
  131. qsub_envs = self._cfg['resource_manager']['env-vars']
  132. argList.extend(('-v', self.__keyValToString(qsub_envs)))
  133. gen_arg_list()
  134. gen_stdin_list()
  135. return argList, stdinList
  136. def __keyValToString(self, keyValList):
  137. ret = ""
  138. for key in keyValList:
  139. ret = "%s%s=%s," % (ret, key, keyValList[key])
  140. return ret[:-1]
  141. def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
  142. if not id:
  143. id = self.getNextNodeSetId()
  144. nodeSet = TorqueNodeSet(id, numNodes, preferred, isPreemptee)
  145. self.nodeSetDict[nodeSet.getId()] = nodeSet
  146. return nodeSet
  147. def submitNodeSet(self, nodeSet, walltime = None, qosLevel = None,
  148. account = None):
  149. argList, stdinList = self.__gen_submit_params(nodeSet, walltime, qosLevel,
  150. account)
  151. jobId, exitCode = self.__torque.qsub(argList, stdinList)
  152. nodeSet.qsubId = jobId
  153. return jobId, exitCode
  154. def freeNodeSet(self, nodeSet):
  155. exitCode = self.deleteJob(nodeSet.getId())
  156. del self.nodeSetDict[nodeSet.getId()]
  157. return exitCode
  158. def finalize(self):
  159. status = 0
  160. exitCode = 0
  161. for nodeSet in self.nodeSetDict.values():
  162. exitCode = self.freeNodeSet(nodeSet)
  163. if exitCode > 0 and exitCode != 153:
  164. status = 4
  165. return status
  166. def getWorkers(self):
  167. hosts = []
  168. qstatInfo = self.__torque(self.getServiceId())
  169. if qstatInfo:
  170. hosts = qstatInfop['exec_host']
  171. return hosts
  172. def pollNodeSet(self, nodeSet):
  173. status = NodeSet.COMPLETE
  174. nodeSet = self.nodeSetDict[0]
  175. qstatInfo = self.__torque(self.getServiceId())
  176. if qstatMap:
  177. jobstate = qstatMap['job_state']
  178. exechost = qstatMap['exec_host']
  179. if jobstate == 'Q':
  180. status = NodeSet.PENDING
  181. elif exechost == None:
  182. status = NodeSet.COMMITTED
  183. else:
  184. nodeSet._setAddrList(exec_host)
  185. return status
  186. def getServiceId(self):
  187. id = None
  188. nodeSets = self.nodeSetDict.values()
  189. if len(nodeSets):
  190. id = nodeSets[0].qsubId
  191. if id == None:
  192. id = os.getenv('PBS_JOBID')
  193. return id
  194. def getJobState(self):
  195. #torque error code when credentials fail, a temporary condition sometimes.
  196. credFailureErrorCode = 171
  197. credFailureRetries = 10
  198. i = 0
  199. jobState = False
  200. while i < credFailureRetries:
  201. qstatInfo, exitCode = self.__torque.qstat(self.getServiceId())
  202. if exitCode == 0:
  203. jobState = qstatInfo['job_state']
  204. break
  205. else:
  206. if exitCode == credFailureErrorCode:
  207. time.sleep(1)
  208. i = i+1
  209. else:
  210. break
  211. return jobState
  212. def deleteJob(self, jobId):
  213. exitCode = self.__torque.qdel(jobId)
  214. return exitCode
  215. def runWorkers(self, args):
  216. return self.__torque.pbsdsh(args)
  217. def updateWorkerInfo(self, workerInfoMap, jobId):
  218. workerInfoStr = ''
  219. for key in workerInfoMap.keys():
  220. workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])
  221. exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)
  222. return exitCode