1
0

idleJobTracker.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import os, re, time
  15. from hodlib.Common.threads import loop, func
  16. from hodlib.Common.threads import simpleCommand
  17. from hodlib.Common.util import get_exception_string, hadoopVersion
  18. class HadoopJobStatus:
  19. """This class represents the status of a single Hadoop job"""
  20. def __init__(self, jobId, status):
  21. self.__jobId = jobId
  22. self.__status = status
  23. def getJobId(self):
  24. return self.__jobId
  25. def getStatus(self):
  26. return self.__status
  27. class HadoopClientException(Exception):
  28. """This class represents an exception that is raised when we fail in
  29. running the job client."""
  30. def __init__(self, errorCode):
  31. self.errorCode = errorCode
  32. class JobTrackerMonitor:
  33. """This class monitors the JobTracker of an allocated cluster
  34. periodically to detect whether it is idle. If it is found
  35. to be idle for more than a configured limit, it calls back
  36. registered handlers who can act upon the idle cluster."""
  37. def __init__(self, log, idleJTHandler, interval, limit,
  38. hadoopDir, javaHome, servInfoProvider):
  39. self.__log = log
  40. self.__idlenessLimit = limit
  41. self.__idleJobTrackerHandler = idleJTHandler
  42. self.__hadoopDir = hadoopDir
  43. hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")
  44. #hadoop directory can be from pkgs or a temp location like tarball. Verify once.
  45. if not os.path.exists(hadoopPath):
  46. raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)
  47. self.__javaHome = javaHome
  48. # Note that when this object is created, we don't yet know the JT URL.
  49. # The service info provider will be polled until we get the URL.
  50. self.__serviceInfoProvider = servInfoProvider
  51. self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
  52. self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")
  53. self.__firstIdleTime = 0
  54. self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
  55. #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
  56. if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
  57. raise Exception('Incompatible Hadoop Version: Cannot check status')
  58. self.__stopFlag = False
  59. self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
  60. self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,
  61. sleep=interval)
  62. self.__jobTrackerURL = None
  63. def start(self):
  64. """This method starts a thread that will determine the JobTracker URL"""
  65. self.__jtURLFinderThread.start()
  66. def stop(self):
  67. self.__log.debug('Joining the monitoring thread.')
  68. self.__stopFlag = True
  69. if self.__jtMonitorThread.isAlive():
  70. self.__jtMonitorThread.join()
  71. self.__log.debug('Joined the monitoring thread.')
  72. def getJobTrackerURL(self):
  73. """This method periodically checks the service info provider for the JT URL"""
  74. self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
  75. while not self.__stopFlag and \
  76. (self.__jobTrackerURL is None or \
  77. self.__jobTrackerURL == 'not found'):
  78. time.sleep(10)
  79. if not self.__stopFlag:
  80. self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
  81. else:
  82. break
  83. if (self.__jobTrackerURL != None) and \
  84. (self.__jobTrackerURL != 'not found'):
  85. self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
  86. self.__jtMonitorThread.start()
  87. def monitorJobTracker(self):
  88. """This method is periodically called to monitor the JobTracker of the cluster."""
  89. try:
  90. if self.__isIdle():
  91. if self.__idleJobTrackerHandler:
  92. self.__log.info('Detected cluster as idle. Calling registered callback handler.')
  93. self.__idleJobTrackerHandler.handleIdleJobTracker()
  94. except:
  95. self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
  96. def getJobsStatus(self):
  97. """This method should return the status of all jobs that are run on the HOD allocated
  98. hadoop cluster"""
  99. jobStatusList = []
  100. try:
  101. hadoop16Version = { 'major' : '0', 'minor' : '16' }
  102. if self.__isCompatibleHadoopVersion(hadoop16Version):
  103. jtStatusCommand = self.__initStatusCommand(option='-list all')
  104. jtStatusCommand.start()
  105. jtStatusCommand.wait()
  106. jtStatusCommand.join()
  107. if jtStatusCommand.exit_code() == 0:
  108. for line in jtStatusCommand.output():
  109. jobStatus = self.__extractJobStatus(line)
  110. if jobStatus is not None:
  111. jobStatusList.append(jobStatus)
  112. except:
  113. self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
  114. return jobStatusList
  115. def __extractJobStatus(self, line):
  116. """This method parses an output line from the job status command and creates
  117. the JobStatus object if there is a match"""
  118. jobStatus = None
  119. line = line.strip()
  120. jsMatch = self.__jobStatusRegExp.match(line)
  121. if jsMatch:
  122. jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
  123. return jobStatus
  124. def __isIdle(self):
  125. """This method checks if the JobTracker is idle beyond a certain limit."""
  126. jobCount = 0
  127. err = False
  128. try:
  129. jobCount = self.__getJobCount()
  130. except HadoopClientException, hce:
  131. self.__log.debug('HadoopClientException handled in getting job count. \
  132. Error code: %s' % hce.errorCode)
  133. err = True
  134. if (jobCount==0) or err:
  135. if self.__firstIdleTime == 0:
  136. #detecting idleness for the first time
  137. self.__firstIdleTime = time.time()
  138. else:
  139. if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):
  140. self.__log.info('Idleness limit crossed for cluster')
  141. return True
  142. else:
  143. # reset idleness time
  144. self.__firstIdleTime = 0
  145. return False
  146. def __getJobCount(self):
  147. """This method executes the hadoop job -list command and parses the output to detect
  148. the number of running jobs."""
  149. # We assume here that the poll interval is small enough to detect running jobs.
  150. # If jobs start and stop within the poll interval, the cluster would be incorrectly
  151. # treated as idle. Hadoop 2266 will provide a better mechanism than this.
  152. jobs = -1
  153. jtStatusCommand = self.__initStatusCommand()
  154. jtStatusCommand.start()
  155. jtStatusCommand.wait()
  156. jtStatusCommand.join()
  157. if jtStatusCommand.exit_code() == 0:
  158. for line in jtStatusCommand.output():
  159. match = self.__jobCountRegExp.match(line)
  160. if match:
  161. jobs = int(match.group(1))
  162. elif jtStatusCommand.exit_code() == 1:
  163. # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets
  164. # to differentiate and give more granular exit codes, we can check for those errors
  165. # corresponding to network errors etc.
  166. raise HadoopClientException(jtStatusCommand.exit_code())
  167. return jobs
  168. def __isCompatibleHadoopVersion(self, expectedVersion):
  169. """This method determines whether the version of hadoop being used is one that
  170. is higher than the expectedVersion.
  171. This can be used for checking if a particular feature is available or not"""
  172. ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
  173. ret = False
  174. if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \
  175. and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
  176. ret = True
  177. return ret
  178. def __initStatusCommand(self, option="-list"):
  179. """This method initializes the command to run to check the JT status"""
  180. cmd = None
  181. hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
  182. cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
  183. cmdStr = "%s %s" % (cmdStr, option)
  184. self.__log.debug('cmd str %s' % cmdStr)
  185. env = os.environ
  186. env['JAVA_HOME'] = self.__javaHome
  187. cmd = simpleCommand('HadoopStatus', cmdStr, env)
  188. return cmd