123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- #See the License for the specific language governing permissions and
- #limitations under the License.
- """defines Service as abstract interface"""
- # -*- python -*-
- import random, socket
- class Service:
- """ the service base class that all the
- other services inherit from. """
- def __init__(self, serviceDesc, workDirs):
- self.serviceDesc = serviceDesc
- self.workDirs = workDirs
- def getName(self):
- return self.serviceDesc.getName()
- def getInfoAddrs(self):
- """Return a list of addresses that provide
- information about the servie"""
- return []
- def isLost(self):
- """True if the service is down"""
- raise NotImplementedError
- def addNodes(self, nodeList):
- """add nodeSet"""
- raise NotImplementedError
- def removeNodes(self, nodeList):
- """remove a nodeset"""
- raise NotImplementedError
- def getWorkers(self):
- raise NotImplementedError
- def needsMore(self):
- """return number of nodes the service wants to add"""
- raise NotImplementedError
- def needsLess(self):
- """return number of nodes the service wants to remove"""
- raise NotImplementedError
- class MasterSlave(Service):
- """ the base class for a master slave
- service architecture. """
- def __init__(self, serviceDesc, workDirs,requiredNode):
- Service.__init__(self, serviceDesc, workDirs)
- self.launchedMaster = False
- self.masterInitialized = False
- self.masterAddress = 'none'
- self.requiredNode = requiredNode
- def getRequiredNode(self):
- return self.requiredNode
-
- def getMasterRequest(self):
- """ the number of master you need
- to run for this service. """
- raise NotImplementedError
-
- def isLaunchable(self, serviceDict):
- """ if your service does not depend on
- other services. is set to true by default. """
- return True
-
- def getMasterCommands(self, serviceDict):
- """ a list of master commands you
- want to run for this service. """
- raise NotImplementedError
- def getAdminCommands(self, serviceDict):
- """ a list of admin commands you
- want to run for this service. """
- raise NotImplementedError
- def getWorkerCommands(self, serviceDict):
- """ a list of worker commands you want to
- run for this service. """
- raise NotImplementedError
- def setMasterNodes(self, list):
- """ set the status of master nodes
- after they start running on a node cluster. """
- raise NotImplementedError
- def addNodes(self, list):
- """ add nodes to a service. Not implemented
- currently. """
- raise NotImplementedError
- def getMasterAddrs(self):
- """ return the addresses of master. the
- hostname:port to which worker nodes should
- connect. """
- raise NotImplementedError
-
- def setMasterParams(self, list):
- """ set the various master params
- depending on what each hodring set
- the master params to. """
- raise NotImplementedError
- def setlaunchedMaster(self):
- """ set the status of master launched
- to true. """
- self.launchedMaster = True
- def isMasterLaunched(self):
- """ return if a master has been launched
- for the service or not. """
- return self.launchedMaster
- def isMasterInitialized(self):
- """ return if a master if launched
- has been initialized or not. """
- return self.masterInitialized
- def setMasterInitialized(self):
- """ set the master initialized to
- true. """
- self.masterInitialized = True
- def getMasterAddress(self):
- """ it needs to change to reflect
- more that one masters. Currently it
- keeps a knowledge of where the master
- was launched and to keep track if it was actually
- up or not. """
- return self.masterAddress
- def setMasterAddress(self, addr):
- self.masterAddress = addr
- def isExternal(self):
- return self.serviceDesc.isExternal()
-
- class NodeRequest:
- """ A class to define
- a node request. """
- def __init__(self, n, required = [], preferred = [], isPreemptee = True):
- self.numNodes = n
- self.preferred = preferred
- self.isPreemptee = isPreemptee
- self.required = required
- def setNumNodes(self, n):
- self.numNodes = n
- def setPreferredList(self, list):
- self.preferred = list
- def setIsPreemptee(self, flag):
- self.isPreemptee = flag
- class ServiceUtil:
- """ this class should be moved out of
- service.py to a util file"""
- localPortUsed = {}
-
- def getUniqRandomPort(h=None, low=50000, high=60000, retry = 30):
- """This allocates a randome free port between low and high"""
- while retry > 0:
- n = random.randint(low, high)
- if n in ServiceUtil.localPortUsed:
- retry -= 1
- continue
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if not h:
- h = socket.gethostname()
- avail = False
- try:
- s.connect((h, n))
- except:
- avail = True
- if avail:
- ServiceUtil.localPortUsed[n] = True
- return n
- raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
-
- getUniqRandomPort = staticmethod(getUniqRandomPort)
-
- def getUniqPort(h=None, low=40000, high=60000, retry = 30):
- """get unique port on a host that can be used by service
- This and its consumer code should disappear when master
- nodes get allocatet by nodepool"""
- n = low
- while retry > 0:
- n = n + 1
- if n in ServiceUtil.localPortUsed:
- retry -= 1
- continue
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if not h:
- h = socket.gethostname()
- avail = False
- try:
- s.connect((h, n))
- except:
- avail = True
- if avail:
- ServiceUtil.localPortUsed[n] = True
- return n
- raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
- getUniqPort = staticmethod(getUniqPort)
|