service.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """defines Service as abstract interface"""
  15. # -*- python -*-
  16. import random, socket
  17. class Service:
  18. """ the service base class that all the
  19. other services inherit from. """
  20. def __init__(self, serviceDesc, workDirs):
  21. self.serviceDesc = serviceDesc
  22. self.workDirs = workDirs
  23. def getName(self):
  24. return self.serviceDesc.getName()
  25. def getInfoAddrs(self):
  26. """Return a list of addresses that provide
  27. information about the servie"""
  28. return []
  29. def isLost(self):
  30. """True if the service is down"""
  31. raise NotImplementedError
  32. def addNodes(self, nodeList):
  33. """add nodeSet"""
  34. raise NotImplementedError
  35. def removeNodes(self, nodeList):
  36. """remove a nodeset"""
  37. raise NotImplementedError
  38. def getWorkers(self):
  39. raise NotImplementedError
  40. def needsMore(self):
  41. """return number of nodes the service wants to add"""
  42. raise NotImplementedError
  43. def needsLess(self):
  44. """return number of nodes the service wants to remove"""
  45. raise NotImplementedError
  46. class MasterSlave(Service):
  47. """ the base class for a master slave
  48. service architecture. """
  49. def __init__(self, serviceDesc, workDirs,requiredNode):
  50. Service.__init__(self, serviceDesc, workDirs)
  51. self.launchedMaster = False
  52. self.masterInitialized = False
  53. self.masterAddress = 'none'
  54. self.requiredNode = requiredNode
  55. def getRequiredNode(self):
  56. return self.requiredNode
  57. def getMasterRequest(self):
  58. """ the number of master you need
  59. to run for this service. """
  60. raise NotImplementedError
  61. def isLaunchable(self, serviceDict):
  62. """ if your service does not depend on
  63. other services. is set to true by default. """
  64. return True
  65. def getMasterCommands(self, serviceDict):
  66. """ a list of master commands you
  67. want to run for this service. """
  68. raise NotImplementedError
  69. def getAdminCommands(self, serviceDict):
  70. """ a list of admin commands you
  71. want to run for this service. """
  72. raise NotImplementedError
  73. def getWorkerCommands(self, serviceDict):
  74. """ a list of worker commands you want to
  75. run for this service. """
  76. raise NotImplementedError
  77. def setMasterNodes(self, list):
  78. """ set the status of master nodes
  79. after they start running on a node cluster. """
  80. raise NotImplementedError
  81. def addNodes(self, list):
  82. """ add nodes to a service. Not implemented
  83. currently. """
  84. raise NotImplementedError
  85. def getMasterAddrs(self):
  86. """ return the addresses of master. the
  87. hostname:port to which worker nodes should
  88. connect. """
  89. raise NotImplementedError
  90. def setMasterParams(self, list):
  91. """ set the various master params
  92. depending on what each hodring set
  93. the master params to. """
  94. raise NotImplementedError
  95. def setlaunchedMaster(self):
  96. """ set the status of master launched
  97. to true. """
  98. self.launchedMaster = True
  99. def isMasterLaunched(self):
  100. """ return if a master has been launched
  101. for the service or not. """
  102. return self.launchedMaster
  103. def isMasterInitialized(self):
  104. """ return if a master if launched
  105. has been initialized or not. """
  106. return self.masterInitialized
  107. def setMasterInitialized(self):
  108. """ set the master initialized to
  109. true. """
  110. self.masterInitialized = True
  111. def getMasterAddress(self):
  112. """ it needs to change to reflect
  113. more that one masters. Currently it
  114. keeps a knowledge of where the master
  115. was launched and to keep track if it was actually
  116. up or not. """
  117. return self.masterAddress
  118. def setMasterAddress(self, addr):
  119. self.masterAddress = addr
  120. def isExternal(self):
  121. return self.serviceDesc.isExternal()
  122. class NodeRequest:
  123. """ A class to define
  124. a node request. """
  125. def __init__(self, n, required = [], preferred = [], isPreemptee = True):
  126. self.numNodes = n
  127. self.preferred = preferred
  128. self.isPreemptee = isPreemptee
  129. self.required = required
  130. def setNumNodes(self, n):
  131. self.numNodes = n
  132. def setPreferredList(self, list):
  133. self.preferred = list
  134. def setIsPreemptee(self, flag):
  135. self.isPreemptee = flag
  136. class ServiceUtil:
  137. """ this class should be moved out of
  138. service.py to a util file"""
  139. localPortUsed = {}
  140. def getUniqRandomPort(h=None, low=50000, high=60000, retry = 30):
  141. """This allocates a randome free port between low and high"""
  142. while retry > 0:
  143. n = random.randint(low, high)
  144. if n in ServiceUtil.localPortUsed:
  145. retry -= 1
  146. continue
  147. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  148. if not h:
  149. h = socket.gethostname()
  150. avail = False
  151. try:
  152. s.connect((h, n))
  153. except:
  154. avail = True
  155. if avail:
  156. ServiceUtil.localPortUsed[n] = True
  157. return n
  158. raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
  159. getUniqRandomPort = staticmethod(getUniqRandomPort)
  160. def getUniqPort(h=None, low=40000, high=60000, retry = 30):
  161. """get unique port on a host that can be used by service
  162. This and its consumer code should disappear when master
  163. nodes get allocatet by nodepool"""
  164. n = low
  165. while retry > 0:
  166. n = n + 1
  167. if n in ServiceUtil.localPortUsed:
  168. retry -= 1
  169. continue
  170. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  171. if not h:
  172. h = socket.gethostname()
  173. avail = False
  174. try:
  175. s.connect((h, n))
  176. except:
  177. avail = True
  178. if avail:
  179. ServiceUtil.localPortUsed[n] = True
  180. return n
  181. raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
  182. getUniqPort = staticmethod(getUniqPort)