scheduler.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. #!/usr/bin/python
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. # Schedule FailMon execution for nodes of file hosts.list, according to
  18. # the properties file conf/global.config.
  19. import time
  20. import ConfigParser
  21. import subprocess
  22. import threading
  23. import random
  24. jobs = []
  25. username = "user"
  26. connections = 10
  27. failmonDir = ""
  28. maxFiles = 100
  29. # This class represents a thread that connects to a set of cluster
  30. # nodes to locally execute monitoring jobs. These jobs are specified
  31. # as a shell command in the constructor.
  32. class sshThread (threading.Thread):
  33. def __init__(self, threadname, username, command, failmonDir):
  34. threading.Thread.__init__(self)
  35. self.name = threadname
  36. self.username = username
  37. self.command = command
  38. self.failmonDir = failmonDir
  39. self.hosts = []
  40. def addHost(self, host):
  41. self.hosts.append(host)
  42. def run (self):
  43. for host in self.hosts:
  44. toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
  45. print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...",
  46. subprocess.check_call(toRun)
  47. print "Done!"
  48. # This class represents a monitoring job. The param member is a string
  49. # that can be passed in the '--only' list of jobs given to the Java
  50. # class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
  51. # node.
  52. class Job:
  53. def __init__(self, param, interval):
  54. self.param = param
  55. self.interval = interval
  56. self.counter = interval
  57. return
  58. def reset(self):
  59. self.counter = self.interval
  60. # This function reads the configuration file to get the values of the
  61. # configuration parameters.
  62. def getJobs(file):
  63. global username
  64. global connections
  65. global jobs
  66. global failmonDir
  67. global maxFiles
  68. conf = ConfigParser.SafeConfigParser()
  69. conf.read(file)
  70. username = conf.get("Default", "ssh.username")
  71. connections = int(conf.get("Default", "max.connections"))
  72. failmonDir = conf.get("Default", "failmon.dir")
  73. maxFiles = conf.get("Default", "hdfs.files.max")
  74. # Hadoop Log
  75. interval = int(conf.get("Default", "log.hadoop.interval"))
  76. if interval != 0:
  77. jobs.append(Job("hadoopLog", interval))
  78. # System Log
  79. interval = int(conf.get("Default", "log.system.interval"))
  80. if interval != 0:
  81. jobs.append(Job("systemLog", interval))
  82. # NICs
  83. interval = int(conf.get("Default", "nics.interval"))
  84. if interval != 0:
  85. jobs.append(Job("nics", interval))
  86. # CPU
  87. interval = int(conf.get("Default", "cpu.interval"))
  88. if interval != 0:
  89. jobs.append(Job("cpu", interval))
  90. # CPU
  91. interval = int(conf.get("Default", "disks.interval"))
  92. if interval != 0:
  93. jobs.append(Job("disks", interval))
  94. # sensors
  95. interval = int(conf.get("Default", "sensors.interval"))
  96. if interval != 0:
  97. jobs.append(Job("sensors", interval))
  98. # upload
  99. interval = int(conf.get("Default", "upload.interval"))
  100. if interval != 0:
  101. jobs.append(Job("upload", interval))
  102. return
  103. # Compute the gcd (Greatest Common Divisor) of two integerss
  104. def GCD(a, b):
  105. assert isinstance(a, int)
  106. assert isinstance(b, int)
  107. while a:
  108. a, b = b%a, a
  109. return b
  110. # Compute the gcd (Greatest Common Divisor) of a list of integers
  111. def listGCD(joblist):
  112. assert isinstance(joblist, list)
  113. if (len(joblist) == 1):
  114. return joblist[0].interval
  115. g = GCD(joblist[0].interval, joblist[1].interval)
  116. for i in range (2, len(joblist)):
  117. g = GCD(g, joblist[i].interval)
  118. return g
  119. # Merge all failmon files created on the HDFS into a single file
  120. def mergeFiles():
  121. global username
  122. global failmonDir
  123. hostList = []
  124. hosts = open('./conf/hosts.list', 'r')
  125. for host in hosts:
  126. hostList.append(host.strip().rstrip())
  127. randomHost = random.sample(hostList, 1)
  128. mergeCommand = "bin/failmon.sh --mergeFiles"
  129. toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
  130. print "Invoking command on", randomHost, ":\t", mergeCommand, "...",
  131. subprocess.check_call(toRun)
  132. print "Done!"
  133. return
  134. # The actual scheduling is done here
  135. def main():
  136. getJobs("./conf/global.config")
  137. for job in jobs:
  138. print "Configuration: ", job.param, "every", job.interval, "seconds"
  139. globalInterval = listGCD(jobs)
  140. while True :
  141. time.sleep(globalInterval)
  142. params = []
  143. for job in jobs:
  144. job.counter -= globalInterval
  145. if (job.counter <= 0):
  146. params.append(job.param)
  147. job.reset()
  148. if (len(params) == 0):
  149. continue;
  150. onlyStr = "--only " + params[0]
  151. for i in range(1, len(params)):
  152. onlyStr += ',' + params[i]
  153. command = "bin/failmon.sh " + onlyStr
  154. # execute on all nodes
  155. hosts = open('./conf/hosts.list', 'r')
  156. threadList = []
  157. # create a thread for every connection
  158. for i in range(0, connections):
  159. threadList.append(sshThread(i, username, command, failmonDir))
  160. # assign some hosts/connections hosts to every thread
  161. cur = 0;
  162. for host in hosts:
  163. threadList[cur].addHost(host.strip().rstrip())
  164. cur += 1
  165. if (cur == len(threadList)):
  166. cur = 0
  167. for ready in threadList:
  168. ready.start()
  169. for ssht in threading.enumerate():
  170. if ssht != threading.currentThread():
  171. ssht.join()
  172. # if an upload has been done, then maybe we need to merge the
  173. # HDFS files
  174. if "upload" in params:
  175. mergeFiles()
  176. return
  177. if __name__ == '__main__':
  178. main()