|
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
|
|
|
limitations under the License.
|
|
|
'''
|
|
|
|
|
|
-from pwd import getpwnam
|
|
|
import logging
|
|
|
import subprocess
|
|
|
import os
|
|
@@ -30,6 +29,10 @@ import time
|
|
|
import traceback
|
|
|
import AmbariConfig
|
|
|
|
|
|
+try:
|
|
|
+ import pwd
|
|
|
+except ImportError:
|
|
|
+ import winpwd as pwd
|
|
|
|
|
|
global serverTracker
|
|
|
serverTracker = {}
|
|
@@ -77,7 +80,7 @@ class shellRunner:
|
|
|
def run(self, script, user=None):
|
|
|
try:
|
|
|
if user!=None:
|
|
|
- user=getpwnam(user)[2]
|
|
|
+ user = pwd.getpwnam(user)[2]
|
|
|
else:
|
|
|
user = os.getuid()
|
|
|
threadLocal.uid = user
|
|
@@ -93,202 +96,5 @@ class shellRunner:
|
|
|
logger.debug("Exitcode for %s is %d" % (cmd,code))
|
|
|
return {'exitCode': code, 'output': out, 'error': err}
|
|
|
|
|
|
- # dispatch action types
|
|
|
- def runAction(self, clusterId, component, role,
|
|
|
- user, command, cleanUpCommand, result):
|
|
|
- oldDir = os.getcwd()
|
|
|
- #TODO: handle this better. Don't like that it is doing a chdir for the main process
|
|
|
- os.chdir(self.getWorkDir(clusterId, role))
|
|
|
- try:
|
|
|
- if user is not None:
|
|
|
- user=getpwnam(user)[2]
|
|
|
- else:
|
|
|
- user = oldUid
|
|
|
- threadLocal.uid = user
|
|
|
- except Exception:
|
|
|
- logger.warn("%s %s %s can not switch user for RUN_ACTION."
|
|
|
- % (clusterId, component, role))
|
|
|
- code = 0
|
|
|
- cmd = sys.executable
|
|
|
- tempfilename = tempfile.mktemp()
|
|
|
- tmp = open(tempfilename, 'w')
|
|
|
- tmp.write(command['script'])
|
|
|
- tmp.close()
|
|
|
- cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param']))
|
|
|
- commandResult = {}
|
|
|
- p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE,
|
|
|
- stderr=subprocess.PIPE, shell=True, close_fds=True)
|
|
|
- out, err = p.communicate()
|
|
|
- code = p.wait()
|
|
|
- if code != 0:
|
|
|
- commandResult['output'] = out
|
|
|
- commandResult['error'] = err
|
|
|
- commandResult['exitCode'] = code
|
|
|
- result['commandResult'] = commandResult
|
|
|
- os.unlink(tempfilename)
|
|
|
- if code != 0:
|
|
|
- tempfilename = tempfile.mktemp()
|
|
|
- tmp = open(tempfilename, 'w')
|
|
|
- tmp.write(command['script'])
|
|
|
- tmp.close()
|
|
|
- cmd = sys.executable
|
|
|
- cmd = "%s %s %s" % (cmd, tempfilename, " ".join(cleanUpCommand['param']))
|
|
|
- cleanUpCode = 0
|
|
|
- cleanUpResult = {}
|
|
|
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
|
- shell=True, close_fds=True)
|
|
|
- out, err = p.communicate()
|
|
|
- cleanUpCode = p.wait()
|
|
|
- if cleanUpCode != 0:
|
|
|
- cleanUpResult['output'] = out
|
|
|
- cleanUpResult['error'] = err
|
|
|
- cleanUpResult['exitCode'] = cleanUpCode
|
|
|
- result['cleanUpResult'] = cleanUpResult
|
|
|
- os.unlink(tempfilename)
|
|
|
- os._exit(1)
|
|
|
- try:
|
|
|
- os.chdir(oldDir)
|
|
|
- except Exception:
|
|
|
- logger.warn("%s %s %s can not restore environment for RUN_ACTION."
|
|
|
- % (clusterId, component, role))
|
|
|
- return result
|
|
|
-
|
|
|
- # Start a process and presist its state
|
|
|
- def startProcess(self, clusterId, clusterDefinitionRevision, component,
|
|
|
- role, script, user, result):
|
|
|
- global serverTracker
|
|
|
- oldDir = os.getcwd()
|
|
|
- try:
|
|
|
- os.chdir(self.getWorkDir(clusterId,role))
|
|
|
- except Exception:
|
|
|
- logger.warn("%s %s %s can not switch dir for START_ACTION."
|
|
|
- % (clusterId, component, role))
|
|
|
- oldUid = os.getuid()
|
|
|
- try:
|
|
|
- if user is not None:
|
|
|
- user=getpwnam(user)[2]
|
|
|
- else:
|
|
|
- user = os.getuid()
|
|
|
- threadLocal.uid = user
|
|
|
- except Exception:
|
|
|
- logger.warn("%s %s %s can not switch user for START_ACTION."
|
|
|
- % (clusterId, component, role))
|
|
|
- code = 0
|
|
|
- commandResult = {}
|
|
|
- process = self.getServerKey(clusterId,clusterDefinitionRevision,
|
|
|
- component,role)
|
|
|
- if not process in serverTracker:
|
|
|
- try:
|
|
|
- plauncher = processlauncher(script,user)
|
|
|
- plauncher.start()
|
|
|
- plauncher.blockUntilProcessCreation()
|
|
|
- except Exception:
|
|
|
- traceback.print_exc()
|
|
|
- logger.warn("Can not launch process for %s %s %s"
|
|
|
- % (clusterId, component, role))
|
|
|
- code = -1
|
|
|
- serverTracker[process] = plauncher
|
|
|
- commandResult['exitCode'] = code
|
|
|
- result['commandResult'] = commandResult
|
|
|
- try:
|
|
|
- os.chdir(oldDir)
|
|
|
- except Exception:
|
|
|
- logger.warn("%s %s %s can not restore environment for START_ACTION." \
|
|
|
- % (clusterId, component, role))
|
|
|
- return result
|
|
|
-
|
|
|
- # Stop a process and remove presisted state
|
|
|
- def stopProcess(self, processKey):
|
|
|
- global serverTracker
|
|
|
- keyFragments = processKey.split('/')
|
|
|
- process = self.getServerKey(keyFragments[0],keyFragments[1],
|
|
|
- keyFragments[2],keyFragments[3])
|
|
|
- if process in serverTracker:
|
|
|
- logger.info ("Sending %s with PID %d the SIGTERM signal"
|
|
|
- % (process,serverTracker[process].getpid()))
|
|
|
- killprocessgrp(serverTracker[process].getpid())
|
|
|
- del serverTracker[process]
|
|
|
-
|
|
|
def getServerTracker(self):
|
|
|
- return serverTracker
|
|
|
-
|
|
|
- def getServerKey(self,clusterId, clusterDefinitionRevision, component, role):
|
|
|
- return clusterId+"/"+str(clusterDefinitionRevision)+"/"+component+"/"+role
|
|
|
-
|
|
|
- def getWorkDir(self, clusterId, role):
|
|
|
- prefix = AmbariConfig.config.get('stack','installprefix')
|
|
|
- return str(os.path.join(prefix, clusterId, role))
|
|
|
-
|
|
|
-
|
|
|
-class processlauncher(threading.Thread):
|
|
|
- def __init__(self,script,uid):
|
|
|
- threading.Thread.__init__(self)
|
|
|
- self.script = script
|
|
|
- self.serverpid = -1
|
|
|
- self.uid = uid
|
|
|
- self.out = None
|
|
|
- self.err = None
|
|
|
-
|
|
|
- def run(self):
|
|
|
- try:
|
|
|
- tempfilename = tempfile.mktemp()
|
|
|
- noteTempFile(tempfilename)
|
|
|
- pythoncmd = sys.executable
|
|
|
- tmp = open(tempfilename, 'w')
|
|
|
- tmp.write(self.script['script'])
|
|
|
- tmp.close()
|
|
|
- threadLocal.uid = self.uid
|
|
|
- self.cmd = "%s %s %s" % (pythoncmd, tempfilename,
|
|
|
- " ".join(self.script['param']))
|
|
|
- logger.info("Launching %s as uid %d" % (self.cmd,self.uid) )
|
|
|
- p = subprocess.Popen(self.cmd,
|
|
|
- preexec_fn=self.changeUidAndSetSid,
|
|
|
- stdout=subprocess.PIPE,
|
|
|
- stderr=subprocess.PIPE, shell=True, close_fds=True)
|
|
|
- logger.info("Launched %s; PID %d" % (self.cmd,p.pid))
|
|
|
- self.serverpid = p.pid
|
|
|
- self.out, self.err = p.communicate()
|
|
|
- self.code = p.wait()
|
|
|
- logger.info("%s; PID %d exited with code %d \nSTDOUT: %s\nSTDERR %s" %
|
|
|
- (self.cmd,p.pid,self.code,self.out,self.err))
|
|
|
- except:
|
|
|
- logger.warn("Exception encountered while launching : " + self.cmd)
|
|
|
- traceback.print_exc()
|
|
|
-
|
|
|
- os.unlink(self.getpidfile())
|
|
|
- os.unlink(tempfilename)
|
|
|
-
|
|
|
- def blockUntilProcessCreation(self):
|
|
|
- self.getpid()
|
|
|
-
|
|
|
- def getpid(self):
|
|
|
- sleepCount = 1
|
|
|
- while (self.serverpid == -1):
|
|
|
- time.sleep(1)
|
|
|
- logger.info("Waiting for process %s to start" % self.cmd)
|
|
|
- if sleepCount > 10:
|
|
|
- logger.warn("Couldn't start process %s even after %d seconds"
|
|
|
- % (self.cmd,sleepCount))
|
|
|
- os._exit(1)
|
|
|
- return self.serverpid
|
|
|
-
|
|
|
- def getpidfile(self):
|
|
|
- prefix = AmbariConfig.config.get('stack','installprefix')
|
|
|
- pidfile = os.path.join(prefix,str(self.getpid())+".pid")
|
|
|
- return pidfile
|
|
|
-
|
|
|
- def changeUidAndSetSid(self):
|
|
|
- prefix = AmbariConfig.config.get('stack','installprefix')
|
|
|
- pidfile = os.path.join(prefix,str(os.getpid())+".pid")
|
|
|
- #TODO remove try/except (when there is a way to provide
|
|
|
- #config files for testcases). The default config will want
|
|
|
- #to create files in /var/ambari which may not exist unless
|
|
|
- #specifically created.
|
|
|
- #At that point add a testcase for the pid file management.
|
|
|
- try:
|
|
|
- f = open(pidfile,'w')
|
|
|
- f.close()
|
|
|
- except:
|
|
|
- logger.warn("Couldn't write pid file %s for %s" % (pidfile,self.cmd))
|
|
|
- changeUid()
|
|
|
- os.setsid()
|
|
|
+ return serverTracker
|