Browse Source

AMBARI-1541. Upgrade task support in agent. (Sumit Mohanty via swagle)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/trunk@1451746 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Wagle 12 years ago
parent
commit
04541f0e0f

+ 2 - 0
CHANGES.txt

@@ -12,6 +12,8 @@ Trunk (unreleased changes):
 
 
  NEW FEATURES
  NEW FEATURES
 
 
+ AMBARI-1541. Upgrade task support in agent. (Sumit Mohanty via swagle)
+
  AMBARI-1540. Reassign Master Wizard - Steps 3 and 4 (reconfigure
  AMBARI-1540. Reassign Master Wizard - Steps 3 and 4 (reconfigure
  component and review). (yusaku)
  component and review). (yusaku)
 
 

+ 27 - 21
ambari-agent/src/main/python/ambari_agent/puppetExecutor.py → ambari-agent/src/main/python/ambari_agent/PuppetExecutor.py

@@ -31,21 +31,14 @@ import traceback
 
 
 logger = logging.getLogger()
 logger = logging.getLogger()
 
 
-class puppetExecutor:
+class PuppetExecutor:
 
 
   """ Class that executes the commands that come from the server using puppet.
   """ Class that executes the commands that come from the server using puppet.
   This is the class that provides the pluggable point for executing the puppet"""
   This is the class that provides the pluggable point for executing the puppet"""
 
 
-  # How many lines from command output send to server
-  OUTPUT_LAST_LINES = 10
-  # How many lines from command error output send to server (before Err phrase)
-  ERROR_LAST_LINES_BEFORE = 30
-  # How many lines from command error output send to server (after Err phrase)
-  ERROR_LAST_LINES_AFTER = 30
-
   # How many seconds will pass before running puppet is terminated on timeout
   # How many seconds will pass before running puppet is terminated on timeout
   PUPPET_TIMEOUT_SECONDS = 600
   PUPPET_TIMEOUT_SECONDS = 600
-
+  grep = Grep()
   event = threading.Event()
   event = threading.Event()
   last_puppet_has_been_killed = False
   last_puppet_has_been_killed = False
 
 
@@ -58,6 +51,7 @@ class puppetExecutor:
     self.tmpDir = tmpDir
     self.tmpDir = tmpDir
     self.reposInstalled = False
     self.reposInstalled = False
     self.config = config
     self.config = config
+    self.modulesdir = self.puppetModule + "/modules"
 
 
   def configureEnviron(self, environ):
   def configureEnviron(self, environ):
     if not self.config.has_option("puppet", "ruby_home"):
     if not self.config.has_option("puppet", "ruby_home"):
@@ -73,7 +67,7 @@ class puppetExecutor:
     
     
   def getPuppetBinary(self):
   def getPuppetBinary(self):
     puppetbin = os.path.join(self.puppetInstall, "bin", "puppet") 
     puppetbin = os.path.join(self.puppetInstall, "bin", "puppet") 
-    if (os.path.exists(puppetbin)):
+    if os.path.exists(puppetbin):
       return puppetbin
       return puppetbin
     else:
     else:
       logger.info("Using default puppet on the host : " + puppetbin 
       logger.info("Using default puppet on the host : " + puppetbin 
@@ -81,9 +75,9 @@ class puppetExecutor:
       return "puppet"
       return "puppet"
      
      
   def deployRepos(self, command, tmpDir, modulesdir, taskId):
   def deployRepos(self, command, tmpDir, modulesdir, taskId):
-    """ Hack to only create the repo files once """
+    # Hack to only create the repo files once
     result = []
     result = []
-    if (not self.reposInstalled):
+    if not self.reposInstalled:
       repoInstaller = RepoInstaller(command, tmpDir, modulesdir, taskId, self.config)
       repoInstaller = RepoInstaller(command, tmpDir, modulesdir, taskId, self.config)
       result = repoInstaller.installRepos()
       result = repoInstaller.installRepos()
     return result
     return result
@@ -102,32 +96,44 @@ class puppetExecutor:
     pass
     pass
 
 
   def condenseOutput(self, stdout, stderr, retcode):
   def condenseOutput(self, stdout, stderr, retcode):
-    grep = Grep()
+    grep = self.grep
     if stderr == self.NO_ERROR:
     if stderr == self.NO_ERROR:
-      result = grep.tail(stdout, self.OUTPUT_LAST_LINES)
+      result = grep.tail(stdout, grep.OUTPUT_LAST_LINES)
     else:
     else:
-      result = grep.grep(stdout, "fail", self.ERROR_LAST_LINES_BEFORE, self.ERROR_LAST_LINES_AFTER)
+      result = grep.grep(stdout, "fail", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
       if result is None: # Second try
       if result is None: # Second try
-       result = grep.grep(stdout, "err", self.ERROR_LAST_LINES_BEFORE, self.ERROR_LAST_LINES_AFTER)
+       result = grep.grep(stdout, "err", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
     filteredresult = grep.filterMarkup(result)
     filteredresult = grep.filterMarkup(result)
     return filteredresult
     return filteredresult
 
 
   def isSuccessfull(self, returncode):
   def isSuccessfull(self, returncode):
     return not self.last_puppet_has_been_killed and (returncode == 0 or returncode == 2)
     return not self.last_puppet_has_been_killed and (returncode == 0 or returncode == 2)
 
 
+  def just_run_one_file(self, command, file, tmpout, tmperr):
+    result = {}
+    taskId = 0
+    if command.has_key("taskId"):
+      taskId = command['taskId']
+    #Install repos
+    self.deployRepos(command, self.tmpDir, self.modulesdir, command.taskId)
+    puppetEnv = os.environ
+    self.runPuppetFile(file, result, puppetEnv, tmpout, tmperr)
+    if self.isSuccessfull(result["exitcode"]):
+      # Check if all the repos were installed or not and reset the flag
+      self.reposInstalled = True
+    return result
+
   def runCommand(self, command, tmpoutfile, tmperrfile):
   def runCommand(self, command, tmpoutfile, tmperrfile):
     result = {}
     result = {}
     taskId = 0
     taskId = 0
     if command.has_key("taskId"):
     if command.has_key("taskId"):
       taskId = command['taskId']
       taskId = command['taskId']
-      
     puppetEnv = os.environ
     puppetEnv = os.environ
     #Install repos
     #Install repos
-    modulesdir = self.puppetModule + "/modules"
-    puppetFiles = self.deployRepos(command, self.tmpDir, modulesdir, taskId)
+    puppetFiles = self.deployRepos(command, self.tmpDir, self.modulesdir, taskId)
     siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp") 
     siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp") 
     puppetFiles.append(siteppFileName)
     puppetFiles.append(siteppFileName)
-    generateManifest(command, siteppFileName, modulesdir, self.config)
+    generateManifest(command, siteppFileName, self.modulesdir, self.config)
     #Run all puppet commands, from manifest generator and for repos installation
     #Run all puppet commands, from manifest generator and for repos installation
     #Appending outputs and errors, exitcode - maximal from all
     #Appending outputs and errors, exitcode - maximal from all
     for puppetFile in puppetFiles:
     for puppetFile in puppetFiles:
@@ -227,7 +233,7 @@ def main():
   jsonStr = jsonFile.read() 
   jsonStr = jsonFile.read() 
   # Below is for testing only.
   # Below is for testing only.
   
   
-  puppetInstance = puppetExecutor("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
+  puppetInstance = PuppetExecutor("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
                                   "/usr/",
                                   "/usr/",
                                   "/root/workspace/puppet-install/facter-1.6.10/",
                                   "/root/workspace/puppet-install/facter-1.6.10/",
                                   "/tmp")
                                   "/tmp")

+ 43 - 0
ambari-agent/src/main/python/ambari_agent/PythonExecutor.py

@@ -0,0 +1,43 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import os.path
+import logging
+import subprocess
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+
+logger = logging.getLogger()
+
+class PythonExecutor:
+
+  def __init__(self):
+    pass
+
+  def run_file(self, name, stdout, stderr):
+    """
+    Executes the file specified in a separate subprocess.
+    Method returns only when the subprocess is finished or timeout is exceeded
+    """
+    # TODO: implement
+    logger.warn("TODO: Python file execution is not supported yet")
+    pass

+ 114 - 0
ambari-agent/src/main/python/ambari_agent/StackVersionsFileHandler.py

@@ -0,0 +1,114 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os.path
+import logging
+import subprocess
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+import shutil
+
+logger = logging.getLogger()
+
+class StackVersionsFileHandler:
+
+  VER_FILE = "current-stack"
+  DEFAULT_VER = ""
+
+  def __init__(self, versionsFileDir):
+    self.versionsFileDir = versionsFileDir
+    self.versionsFilePath = os.path.join(versionsFileDir, self.VER_FILE)
+    self._lock = threading.RLock()
+
+  def read_stack_version(self, component):
+    try :
+      self.touch_file()
+      for line in open(self.versionsFilePath):
+        comp, ver = self.extract(line)
+        if comp == component:
+          return ver
+      return self.DEFAULT_VER
+    except Exception, err:
+      logger.error("Can't read versions file: %s " % err.message)
+      traceback.print_exc()
+      return self.DEFAULT_VER
+
+
+  def read_all_stack_versions(self):
+    result = {}
+    try :
+      self.touch_file()
+      for line in open(self.versionsFilePath):
+        comp, ver = self.extract(line)
+        if comp != self.DEFAULT_VER:
+          result[comp] = ver
+      return result
+    except Exception, err:
+      logger.error("Can't read stack versions file: %s " % err.message)
+      traceback.print_exc()
+      return {}
+
+
+  def write_stack_version(self, component, newVersion):
+    self._lock.acquire()
+    try:
+      values = self.read_all_stack_versions()
+      values[component] = newVersion
+      logger.info("Backing up old stack versions file")
+      backup = os.path.join(self.versionsFileDir, self.VER_FILE + ".bak")
+      shutil.move(self.versionsFilePath, backup)
+      logger.info("Writing new stack versions file")
+      with open (self.versionsFilePath, 'w') as f:
+        for key in values:
+          f.write ("%s\t%s\n" % (key, values))
+
+    except Exception, err:
+      logger.error("Can't write new stack version (%s %s) :%s " % (component,
+            newVersion, err.message))
+      traceback.print_exc()
+    finally:
+      self._lock.release()
+
+
+  def extract(self, statement):
+    '''
+    Extracts <Component>, <HDPstack version> values from lines like
+    NAGIOS	StackVersion-1.3.0
+    '''
+    parts = statement.strip().split()
+    if len(parts) != 2:
+      logger.warn("Wrong stack versions file statement format: %s" % statement)
+      return self.DEFAULT_VER, self.DEFAULT_VER
+    else:
+      return parts[0], parts[1]
+
+
+  def touch_file(self):
+    '''
+     Called to create file when it does not exist
+    '''
+    if not os.path.isfile(self.versionsFilePath):
+      logger.info("Creating stacks versions file at %s" % self.versionsFilePath)
+      open(self.versionsFilePath, 'w').close()
+
+

+ 202 - 0
ambari-agent/src/main/python/ambari_agent/UpgradeExecutor.py

@@ -0,0 +1,202 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import json
+import os.path
+import logging
+import subprocess
+from manifestGenerator import generateManifest
+from RepoInstaller import RepoInstaller
+import pprint, threading
+from Grep import Grep
+from threading import Thread
+import shell
+import traceback
+from Grep import Grep
+from StackVersionsFileHandler import StackVersionsFileHandler
+import re
+
+logger = logging.getLogger()
+grep = Grep()
+
+class UpgradeExecutor:
+
+  """ Class that performs the StackVersion stack upgrade"""
+
+  SCRIPT_DIRS = [
+    'pre-upgrade.d',
+    'upgrade.d',
+    'post-upgrade.d'
+  ]
+
+  NAME_PARSING_FAILED_CODE = 999
+
+  def __init__(self, pythonExecutor, puppetExecutor, config):
+    self.pythonExecutor = pythonExecutor
+    self.puppetExecutor = puppetExecutor
+    self.stacksDir = config.get('stack', 'upgradeScriptsDir')
+    self.config = config
+    versionsFileDir = config.get('agent', 'prefix')
+    self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
+
+
+  def perform_stack_upgrade(self, command, tmpout, tmperr):
+    logger.info("Performing stack upgrade")
+    params = command['commandParams']
+    srcStack = params['source_stack_version']
+    tgtStack = params['target_stack_version']
+    component = command['component']
+
+    srcStackTuple = self.split_stack_version(srcStack)
+    tgtStackTuple = self.split_stack_version(srcStack)
+
+    if srcStackTuple == None or tgtStackTuple == None:
+      errorstr = "Source (%s) or target (%s) version does not match pattern \
+      <Name>-<Version>" % (srcStack, tgtStack)
+      logger.info(errorstr)
+      result = {
+        'exitcode' : 1,
+        'stdout'   : 'None',
+        'stderr'   : errorstr
+      }
+    elif srcStack != tgtStack:
+      paramTuple = sum((srcStackTuple, tgtStackTuple), ())
+      upgradeId = "%s-%s.%s_%s-%s.%s" % paramTuple
+      # Check stack version (do we need upgrade?)
+      basedir = os.path.join(self.stacksDir, upgradeId, component)
+      if not os.path.isdir(basedir):
+        errorstr = "Upgrade %s is not supported" % upgradeId
+        logger.error(errorstr)
+        result = {
+          'exitcode' : 1,
+          'stdout'   : errorstr,
+          'stderr'   : errorstr
+        }
+      else:
+        result = {
+          'exitcode' : 0,
+          'stdout'   : '',
+          'stderr'   : ''
+        }
+        for dir in self.SCRIPT_DIRS:
+          if result['exitcode'] != 0:
+            break
+          tmpRes = self.execute_dir(command, basedir, dir, tmpout, tmperr)
+
+          result = {
+            'exitcode' : result['exitcode'] or tmpRes['exitcode'],
+            'stdout'   : "%s\n%s" % (result['stdout'], tmpRes['stdout']),
+            'stderr'   : "%s\n%s" % (result['stderr'], tmpRes['stderr']),
+          }
+
+        if result['exitcode'] == 0:
+          logger.info("Upgrade %s successfully finished" % upgradeId)
+          self.versionsHandler.write_stack_version(component, tgtStack)
+    else:
+      infostr = "target_stack_version (%s) matches current stack version" \
+          " for component %s, nothing to do" % (tgtStack, component)
+      logger.info(infostr)
+      result = {
+        'exitcode' : 0,
+        'stdout'   : infostr,
+        'stderr'   : 'None'
+      }
+    result = {
+      'exitcode' : result['exitcode'],
+      'stdout'   : grep.tail(result['stdout'], grep.OUTPUT_LAST_LINES),
+      'stderr'   : grep.tail(result['stderr'], grep.OUTPUT_LAST_LINES)
+    }
+    return result
+
+
+  def get_key_func(self, name):
+    """
+    Returns a number from filenames like 70-foobar.* or 999 for not matching
+    filenames
+    """
+    parts = name.split('-', 1)
+    if not parts or not parts[0].isdigit():
+      logger.warn("Can't parse script filename number %s" % name)
+      return self.NAME_PARSING_FAILED_CODE # unknown element will be placed to the end of list
+    return int(parts[0])
+
+
+  def split_stack_version(self, verstr):
+    matchObj = re.match( r'^(.*)-(\d+).(\d+)', verstr.strip(), re.M|re.I)
+    stack_name = matchObj.group(1)
+    stack_major_ver = matchObj.group(2)
+    stack_minor_ver = matchObj.group(3)
+    if matchObj:
+      return stack_name, stack_major_ver, stack_minor_ver
+    else:
+      return None
+
+
+  def execute_dir(self, command, basedir, dir, tmpout, tmperr):
+    """
+    Executes *.py and *.pp files located in a given directory.
+    Files a executed in a numeric sorting order.
+    """
+    dirpath = os.path.join(basedir, dir)
+    logger.info("Executing %s" % dirpath)
+    if not os.path.isdir(dirpath):
+      logger.warn("Script directory %s does not exist, skipping")
+      return
+    fileList=os.listdir(dirpath)
+    fileList.sort(key = self.get_key_func)
+    formattedResult = {
+      'exitcode' : 0,
+      'stdout'   : '',
+      'stderr'   : ''
+    }
+    for filename in fileList:
+      prevcode = formattedResult['exitcode']
+      if prevcode != 0 or self.get_key_func(filename) == self.NAME_PARSING_FAILED_CODE:
+        break
+      filepath = os.path.join(dirpath, filename)
+      if filename.endswith(".pp"):
+        logger.info("Running puppet file %s" % filepath)
+        result = self.puppetExecutor.just_run_one_file(command, filename,
+                                                                tmpout, tmperr)
+      elif filename.endswith(".py"):
+        logger.info("Running python file %s" % filepath)
+        result = self.pythonExecutor.run_file(filepath, tmpout, tmperr)
+      elif filename.endswith(".pyc"):
+        pass # skipping compiled files
+      else:
+        warnstr = "Unrecognized file type, skipping: %s" % filepath
+        logger.warn(warnstr)
+        result = {
+          'exitcode' : 0,
+          'stdout'   : warnstr,
+          'stderr'   : 'None'
+        }
+      formattedResult = {
+        'exitcode' : prevcode or result['exitcode'],
+        'stdout'   : "%s\n%s" % (formattedResult['stdout'], result['stdout']),
+        'stderr'   : "%s\n%s" % (formattedResult['stderr'], result['stderr']),
+      }
+    logger.debug("Result of %s: \n %s" % (dirpath, pprint.pformat(formattedResult)))
+    return formattedResult
+
+
+
+
+
+

+ 82 - 3
ambari-agent/src/test/python/TestActionQueue.py

@@ -22,9 +22,9 @@ from unittest import TestCase
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.FileUtil import getFilePath
 from ambari_agent.FileUtil import getFilePath
+from ambari_agent.UpgradeExecutor import UpgradeExecutor
 import os, errno, time, pprint, tempfile, threading
 import os, errno, time, pprint, tempfile, threading
-
-event = threading.Event()
+from mock.mock import patch, MagicMock, call
 
 
 class TestActionQueue(TestCase):
 class TestActionQueue(TestCase):
   def test_ActionQueueStartStop(self):
   def test_ActionQueueStartStop(self):
@@ -48,7 +48,7 @@ class TestActionQueue(TestCase):
     actionQueue.IDLE_SLEEP_TIME = 0.01
     actionQueue.IDLE_SLEEP_TIME = 0.01
     executor_started_event = threading.Event()
     executor_started_event = threading.Event()
     end_executor_event = threading.Event()
     end_executor_event = threading.Event()
-    actionQueue.executor = FakeExecutor(executor_started_event, end_executor_event)
+    actionQueue.puppetExecutor = FakeExecutor(executor_started_event, end_executor_event)
     before_start_result = actionQueue.result()
     before_start_result = actionQueue.result()
 
 
     command = {
     command = {
@@ -99,6 +99,85 @@ class TestActionQueue(TestCase):
     #print("in_progress: " + pprint.pformat(in_progress_result))
     #print("in_progress: " + pprint.pformat(in_progress_result))
     #print("after: " + pprint.pformat(after_start_result))
     #print("after: " + pprint.pformat(after_start_result))
 
 
+  @patch.object(ActionQueue, "executeCommand")
+  @patch.object(ActionQueue, "stopped")
+  def test_upgradeCommand_dispatching(self, stopped_method, executeCommand_method):
+    queue = ActionQueue(config = MagicMock())
+    command = {
+      'commandId': 17,
+      'role' : "role",
+      'taskId' : "taskId",
+      'clusterName' : "clusterName",
+      'serviceName' : "serviceName",
+      'roleCommand' : 'UPGRADE',
+      'hostname' : "localhost.localdomain",
+      'hostLevelParams': "hostLevelParams",
+      'clusterHostInfo': "clusterHostInfo",
+      'configurations': "configurations",
+      'commandType': "EXECUTION_COMMAND",
+      'configurations':{'global' : {}},
+      'roleParams': {},
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.2.1',
+        'target_stack_version' : 'HDP-1.3.0'
+      }
+    }
+    result = [{
+      'exitcode' : 0,
+      'stdout'   : 'abc',
+      'stderr'   : 'def'
+    }]
+    executeCommand_method.return_value = result
+    stopped_method.side_effect = [False, False, True, True, True]
+    queue.stopped = stopped_method
+    queue.IDLE_SLEEP_TIME = 0.001
+    queue.put(command)
+    queue.run()
+    self.assertTrue(executeCommand_method.called)
+    self.assertEquals(queue.resultQueue.qsize(), 1)
+    returned_result = queue.resultQueue.get()
+    self.assertIs(returned_result[1], result[0])
+
+
+  @patch.object(UpgradeExecutor, "perform_stack_upgrade")
+  def test_upgradeCommand_executeCommand(self, perform_stack_upgrade_method):
+    queue = ActionQueue(config = MagicMock())
+    command = {
+      'commandId': 17,
+      'role' : "role",
+      'taskId' : "taskId",
+      'clusterName' : "clusterName",
+      'serviceName' : "serviceName",
+      'roleCommand' : 'UPGRADE',
+      'hostname' : "localhost.localdomain",
+      'hostLevelParams': "hostLevelParams",
+      'clusterHostInfo': "clusterHostInfo",
+      'configurations': "configurations",
+      'commandType': "EXECUTION_COMMAND",
+      'configurations':{'global' : {}},
+      'roleParams': {},
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.2.1',
+        'target_stack_version' : 'HDP-1.3.0'
+      }
+    }
+    perform_stack_upgrade_method.return_value = {
+      'exitcode' : 0,
+      'stdout'   : 'abc',
+      'stderr'   : 'def'
+    }
+    result = queue.executeCommand(command)
+    expected_result = [{'actionId': 17,
+                        'clusterName': 'clusterName',
+                        'exitCode': 0,
+                        'role': 'role',
+                        'serviceName': 'serviceName',
+                        'status': 'COMPLETED',
+                        'stderr': 'def',
+                        'stdout': 'abc',
+                        'taskId': 'taskId'}]
+    self.assertEquals(result, expected_result)
+
 
 
 class FakeExecutor():
 class FakeExecutor():
 
 

+ 7 - 5
ambari-agent/src/test/python/TestController.py

@@ -22,6 +22,7 @@ limitations under the License.
 import StringIO
 import StringIO
 import unittest
 import unittest
 from ambari_agent import Controller
 from ambari_agent import Controller
+from ambari_agent import hostname
 import sys
 import sys
 from mock.mock import patch, MagicMock, call
 from mock.mock import patch, MagicMock, call
 
 
@@ -30,19 +31,22 @@ class TestController(unittest.TestCase):
 
 
   @patch("threading.Thread")
   @patch("threading.Thread")
   @patch("threading.Lock")
   @patch("threading.Lock")
-  @patch("socket.gethostname")
   @patch.object(Controller, "NetUtil")
   @patch.object(Controller, "NetUtil")
-  def setUp(self, NetUtil_mock, hostnameMock, lockMock, threadMock):
+  @patch.object(hostname, "hostname")
+  def setUp(self, hostname_method, NetUtil_mock, lockMock, threadMock):
 
 
     Controller.logger = MagicMock()
     Controller.logger = MagicMock()
-    hostnameMock.return_value = "test_hostname"
     lockMock.return_value = MagicMock()
     lockMock.return_value = MagicMock()
     NetUtil_mock.return_value = MagicMock()
     NetUtil_mock.return_value = MagicMock()
+    hostname_method.return_value = "test_hostname"
+
 
 
     config = MagicMock()
     config = MagicMock()
     config.get.return_value = "something"
     config.get.return_value = "something"
 
 
     self.controller = Controller.Controller(config)
     self.controller = Controller.Controller(config)
+    self.controller.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC = 0.1
+    self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
 
 
 
 
   @patch.object(Controller, "Heartbeat")
   @patch.object(Controller, "Heartbeat")
@@ -324,9 +328,7 @@ class TestController(unittest.TestCase):
     self.controller.sendRequest = Controller.Controller.sendRequest
     self.controller.sendRequest = Controller.Controller.sendRequest
     self.controller.sendRequest = Controller.Controller.addToQueue
     self.controller.sendRequest = Controller.Controller.addToQueue
 
 
-
 if __name__ == "__main__":
 if __name__ == "__main__":
-
   unittest.main(verbosity=2)
   unittest.main(verbosity=2)
 
 
 
 

+ 17 - 13
ambari-agent/src/test/python/TestPuppetExecutor.py

@@ -19,7 +19,7 @@ limitations under the License.
 '''
 '''
 
 
 from unittest import TestCase
 from unittest import TestCase
-from puppetExecutor import puppetExecutor
+from PuppetExecutor import PuppetExecutor
 from Grep import Grep
 from Grep import Grep
 from pprint import pformat
 from pprint import pformat
 import socket, threading, tempfile
 import socket, threading, tempfile
@@ -28,13 +28,11 @@ import sys
 from AmbariConfig import AmbariConfig
 from AmbariConfig import AmbariConfig
 from threading import Thread
 from threading import Thread
 
 
-grep = Grep()
-
 class TestPuppetExecutor(TestCase):
 class TestPuppetExecutor(TestCase):
 
 
 
 
   def test_build(self):
   def test_build(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
     command = puppetexecutor.puppetCommand("site.pp")
     command = puppetexecutor.puppetCommand("site.pp")
     self.assertEquals("puppet", command[0], "puppet binary wrong")
     self.assertEquals("puppet", command[0], "puppet binary wrong")
     self.assertEquals("apply", command[1], "local apply called")
     self.assertEquals("apply", command[1], "local apply called")
@@ -43,9 +41,11 @@ class TestPuppetExecutor(TestCase):
     correct")
     correct")
 
 
   def test_condense_bad2(self):
   def test_condense_bad2(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
-    puppetexecutor.ERROR_LAST_LINES_BEFORE = 2
-    puppetexecutor.ERROR_LAST_LINES_AFTER = 3
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    grep = Grep()
+    puppetexecutor.grep = grep
+    grep.ERROR_LAST_LINES_BEFORE = 2
+    grep.ERROR_LAST_LINES_AFTER = 3
     string_err = open('dummy_puppet_output_error2.txt', 'r').read().replace("\n", os.linesep)
     string_err = open('dummy_puppet_output_error2.txt', 'r').read().replace("\n", os.linesep)
     result = puppetexecutor.condenseOutput(string_err, '', 1)
     result = puppetexecutor.condenseOutput(string_err, '', 1)
     stripped_string = string_err.strip()
     stripped_string = string_err.strip()
@@ -58,7 +58,9 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(len(result.splitlines(True)), 6, "Failed to condence fail log")
     self.assertEquals(len(result.splitlines(True)), 6, "Failed to condence fail log")
 
 
   def test_condense_bad3(self):
   def test_condense_bad3(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    grep = Grep()
+    puppetexecutor.grep = grep
     string_err = open('dummy_puppet_output_error3.txt', 'r').read().replace("\n", os.linesep)
     string_err = open('dummy_puppet_output_error3.txt', 'r').read().replace("\n", os.linesep)
     result = puppetexecutor.condenseOutput(string_err, '', 1)
     result = puppetexecutor.condenseOutput(string_err, '', 1)
     stripped_string = string_err.strip()
     stripped_string = string_err.strip()
@@ -72,10 +74,12 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(len(result.splitlines(True)), 33, "Failed to condence fail log")
     self.assertEquals(len(result.splitlines(True)), 33, "Failed to condence fail log")
 
 
   def test_condense_good(self):
   def test_condense_good(self):
-    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
-    puppetexecutor.OUTPUT_LAST_LINES = 2
+    puppetexecutor = PuppetExecutor("/tmp", "/x", "/y", "/z", AmbariConfig().getConfig())
+    grep = Grep()
+    puppetexecutor.grep = grep
+    grep.OUTPUT_LAST_LINES = 2
     string_good = open('dummy_puppet_output_good.txt', 'r').read().replace("\n", os.linesep)
     string_good = open('dummy_puppet_output_good.txt', 'r').read().replace("\n", os.linesep)
-    result = puppetexecutor.condenseOutput(string_good, puppetExecutor.NO_ERROR, 0)
+    result = puppetexecutor.condenseOutput(string_good, PuppetExecutor.NO_ERROR, 0)
     stripped_string = string_good.strip()
     stripped_string = string_good.strip()
     lines = stripped_string.splitlines(True)
     lines = stripped_string.splitlines(True)
     result_check = lines[45].strip() in result and lines[46].strip() in result
     result_check = lines[45].strip() in result and lines[46].strip() in result
@@ -129,13 +133,13 @@ class TestPuppetExecutor(TestCase):
     self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
     self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
 
 
 
 
-  class  PuppetExecutor_mock(puppetExecutor):
+  class  PuppetExecutor_mock(PuppetExecutor):
 
 
 
 
 
 
     def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config, subprocess_mockup):
     def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config, subprocess_mockup):
       self.subprocess_mockup = subprocess_mockup
       self.subprocess_mockup = subprocess_mockup
-      puppetExecutor.__init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config)
+      PuppetExecutor.__init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config)
       pass
       pass
 
 
     def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):
     def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):

+ 2 - 2
ambari-agent/src/test/python/TestPuppetExecutorManually.py

@@ -19,7 +19,7 @@ limitations under the License.
 '''
 '''
 
 
 from unittest import TestCase
 from unittest import TestCase
-from puppetExecutor import puppetExecutor
+from PuppetExecutor import PuppetExecutor
 from pprint import pformat
 from pprint import pformat
 import socket
 import socket
 import os
 import os
@@ -43,7 +43,7 @@ class TestPuppetExecutor(TestCase):
 
 
     logger.info("***** RUNNING " + FILEPATH + " *****")
     logger.info("***** RUNNING " + FILEPATH + " *****")
     cwd = os.getcwd()
     cwd = os.getcwd()
-    puppetexecutor = puppetExecutor(cwd, "/x", "/y", "/tmp", AmbariConfig().getConfig())
+    puppetexecutor = PuppetExecutor(cwd, "/x", "/y", "/tmp", AmbariConfig().getConfig())
     result = {}
     result = {}
     puppetEnv = os.environ
     puppetEnv = os.environ
     _, tmpoutfile = tempfile.mkstemp()
     _, tmpoutfile = tempfile.mkstemp()

+ 84 - 0
ambari-agent/src/test/python/TestStackVersionsFileHandler.py

@@ -0,0 +1,84 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import unittest
+import StringIO
+import socket
+import os, sys
+from mock.mock import patch
+from mock.mock import MagicMock
+from mock.mock import create_autospec
+import os, errno, tempfile
+from ambari_agent import StackVersionsFileHandler
+import logging
+
+stackVersionsFileHandler = \
+      StackVersionsFileHandler.StackVersionsFileHandler("/tmp")
+dummyVersionsFile = os.path.join('dummy_files', 'dummy_current_stack')
+
+class TestStackVersionsFileHandler(TestCase):
+
+  logger = logging.getLogger()
+
+  @patch.object(stackVersionsFileHandler, 'touch_file')
+  def test_read_stack_version(self, touch_method):
+    stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
+    result = stackVersionsFileHandler.read_stack_version("NAGIOS")
+    self.assertEquals(result, "HDP-1.2.1")
+    result = stackVersionsFileHandler.read_stack_version("GANGLIA")
+    self.assertEquals(result, "HDP-1.2.2")
+    result = stackVersionsFileHandler.read_stack_version("NOTEXISTING")
+    self.assertEquals(result, stackVersionsFileHandler.DEFAULT_VER)
+    self.assertTrue(touch_method.called)
+
+
+  @patch.object(stackVersionsFileHandler, 'touch_file')
+  def test_read_all_stack_versions(self, touch_method):
+    stackVersionsFileHandler.versionsFilePath = dummyVersionsFile
+    result = stackVersionsFileHandler.read_all_stack_versions()
+    self.assertEquals(len(result.keys()), 4)
+    self.assertEquals(result["NAGIOS"], "HDP-1.2.1")
+    self.assertEquals(result["HBASE"], "HDP-1.3.0")
+    self.assertTrue(touch_method.called)
+
+
+  def test_extract(self):
+    s = "   NAGIOS	\t  HDP-1.3.0  "
+    comp, ver = stackVersionsFileHandler.extract(s)
+    self.assertEqual(comp, "NAGIOS")
+    self.assertEqual(ver, "HDP-1.3.0")
+    # testing wrong value
+    s = "   NAGIOS	"
+    comp, ver = stackVersionsFileHandler.extract(s)
+    self.assertEqual(comp, stackVersionsFileHandler.DEFAULT_VER)
+    self.assertEqual(ver, stackVersionsFileHandler.DEFAULT_VER)
+
+
+  def test_touch_file(self):
+    tmpfile = tempfile.mktemp()
+    stackVersionsFileHandler.versionsFilePath = tmpfile
+    stackVersionsFileHandler.touch_file()
+    result = os.path.isfile(tmpfile)
+    self.assertEqual(result, True)
+
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)

+ 220 - 0
ambari-agent/src/test/python/TestUpgradeExecutor.py

@@ -0,0 +1,220 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import unittest
+import StringIO
+import socket
+import os, sys, pprint
+from mock.mock import patch
+from mock.mock import MagicMock
+from mock.mock import create_autospec
+import os, errno, tempfile
+from ambari_agent import UpgradeExecutor
+import logging
+from ambari_agent import AmbariConfig
+from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
+
+class TestUpgradeExecutor(TestCase):
+
+  logger = logging.getLogger()
+
+  @patch.object(StackVersionsFileHandler, 'write_stack_version')
+  @patch('os.path.isdir')
+  def test_perform_stack_upgrade(self, isdir_method, write_stack_version_method):
+    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
+      'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
+
+    # Checking matching versions
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.3.0',
+        'target_stack_version' : 'HDP-1.3.0',
+       },
+      'component' : 'HDFS'
+    }
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertIn('matches current stack version', result['stdout'])
+    self.assertFalse(write_stack_version_method.called)
+    # Checking unsupported update
+    write_stack_version_method.reset()
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.0.1',
+        'target_stack_version' : 'HDP-1.3.0',
+      },
+      'component' : 'HDFS'
+    }
+    isdir_method.return_value = False
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertIn('not supported', result['stderr'])
+    self.assertFalse(write_stack_version_method.called)
+    # Checking successful result
+    write_stack_version_method.reset()
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.0.1',
+        'target_stack_version' : 'HDP-1.3.0',
+      },
+      'component' : 'HDFS'
+    }
+    isdir_method.return_value = True
+    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr : \
+      {
+        'exitcode' : 0,
+        'stdout'   : "output - %s" % dir,
+        'stderr'   : "errors - %s" % dir,
+      }
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertTrue(write_stack_version_method.called)
+    self.assertEquals(result['exitcode'],0)
+    self.assertEquals(result['stdout'],'output - pre-upgrade.d\noutput - upgrade.d\noutput - post-upgrade.d')
+    self.assertEquals(result['stderr'],'errors - pre-upgrade.d\nerrors - upgrade.d\nerrors - post-upgrade.d')
+    # Checking failed result
+    write_stack_version_method.reset()
+    command = {
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.0.1',
+        'target_stack_version' : 'HDP-1.3.0',
+      },
+      'component' : 'HDFS'
+    }
+    isdir_method.return_value = True
+    executor.execute_dir = lambda command, basedir, dir, tmpout, tmperr :\
+    {
+      'exitcode' : 1,
+      'stdout'   : "output - %s" % dir,
+      'stderr'   : "errors - %s" % dir,
+      }
+    result = executor.perform_stack_upgrade(command, 'tmpout', 'tmperr')
+    self.assertTrue(write_stack_version_method.called)
+    self.assertEquals(result['exitcode'],1)
+    self.assertEquals(result['stdout'],'output - pre-upgrade.d')
+    self.assertEquals(result['stderr'],'errors - pre-upgrade.d')
+
+
+  def test_get_key_func(self):
+    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
+                 'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
+    # Checking unparseable
+    self.assertEqual(executor.get_key_func('fdsfds'), 999)
+    self.assertEqual(executor.get_key_func('99dfsfd'), 999)
+    self.assertEqual(executor.get_key_func('-fdfds'), 999)
+    # checking parseable
+    self.assertEqual(executor.get_key_func('99'), 99)
+    self.assertEqual(executor.get_key_func('45-install'), 45)
+    self.assertEqual(executor.get_key_func('33-install-staff'), 33)
+    #checking sorting of full list
+    testlist1 = ['7-fdfd', '10-erewfds', '11-fdfdfd', '1-hh', '20-kk', '01-tt']
+    testlist1.sort(key = executor.get_key_func)
+    self.assertEqual(testlist1,
+        ['1-hh', '01-tt', '7-fdfd', '10-erewfds', '11-fdfdfd', '20-kk'])
+
+
+  def test_split_stack_version(self):
+    executor = UpgradeExecutor.UpgradeExecutor('pythonExecutor',
+             'puppetExecutor', AmbariConfig.AmbariConfig().getConfig())
+    result = executor.split_stack_version("HDP-1.2.1")
+    self.assertEquals(result, ('HDP', '1', '2'))
+    result = executor.split_stack_version("HDP-1.3")
+    self.assertEquals(result, ('HDP', '1', '3'))
+    result = executor.split_stack_version("ComplexStackVersion-1.3.4.2.2")
+    self.assertEquals(result, ('ComplexStackVersion', '1', '3'))
+    pass
+
+
+  @patch('os.listdir')
+  @patch('os.path.isdir')
+  @patch.object(UpgradeExecutor.UpgradeExecutor, 'get_key_func')
+  def test_execute_dir(self, get_key_func_method, isdir_method, listdir_method):
+    pythonExecutor = MagicMock()
+    puppetExecutor = MagicMock()
+
+    command = {'debug': 'command'}
+    isdir_method.return_value = True
+    # Mocking sort() method of list
+    class MyList(list):
+      pass
+    files = MyList(['first.py', 'second.pp', 'third.py', 'fourth.nm',
+             'fifth-failing.py', 'six.py'])
+    files.sort = lambda key: None
+    listdir_method.return_value = files
+    # fifth-failing.py will fail
+    pythonExecutor.run_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - first.py",
+       'stderr'   : "stderr - first.py"},
+      {'exitcode' : 0,
+       'stdout'   : "stdout - third.py",
+       'stderr'   : "stderr - third.py"},
+      {'exitcode' : 1,
+       'stdout'   : "stdout - fifth-failing.py",
+       'stderr'   : "stderr - fifth-failing.py"},
+      {'exitcode' : 0,
+       'stdout'   : "stdout - six.py",
+       'stderr'   : "stderr - six.py"},
+    ]
+    puppetExecutor.just_run_one_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - second.pp",
+       'stderr'   : "stderr - second.pp"},
+    ]
+
+    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
+        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
+
+    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
+    self.assertEquals(result['exitcode'],1)
+    self.assertEquals(result['stdout'],"\nstdout - first.py\nstdout - second.pp\nstdout - third.py\nUnrecognized file type, skipping: basedir/dir/fourth.nm\nstdout - fifth-failing.py")
+    self.assertEquals(result['stderr'],"\nstderr - first.py\nstderr - second.pp\nstderr - third.py\nNone\nstderr - fifth-failing.py")
+
+
+  @patch('os.listdir')
+  @patch('os.path.isdir')
+  def test_execute_dir_ignore_badly_named(self, isdir_method, listdir_method):
+    pythonExecutor = MagicMock()
+    puppetExecutor = MagicMock()
+
+    command = {'debug': 'command'}
+    isdir_method.return_value = True
+    files = ['00-first.py', 'badly-named.pp', '10-second.pp', '20-wrong.cpp']
+    listdir_method.return_value = files
+    # fifth-failing.py will fail
+    pythonExecutor.run_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - python.py",
+       'stderr'   : "stderr - python.py"},
+    ]
+    puppetExecutor.just_run_one_file.side_effect = [
+      {'exitcode' : 0,
+       'stdout'   : "stdout - puppet.pp",
+       'stderr'   : "stderr - puppet.pp"},
+    ]
+
+    executor = UpgradeExecutor.UpgradeExecutor(pythonExecutor,
+        puppetExecutor, AmbariConfig.AmbariConfig().getConfig())
+
+    result= executor.execute_dir(command, 'basedir', 'dir', 'tmpout', 'tmperr')
+    self.assertEquals(result['exitcode'],0)
+    self.assertEquals(result['stdout'],'\nstdout - python.py\nstdout - puppet.pp\nUnrecognized file type, skipping: basedir/dir/20-wrong.cpp')
+    self.assertEquals(result['stderr'],'\nstderr - python.py\nstderr - puppet.pp\nNone')
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)

+ 4 - 0
ambari-agent/src/test/python/dummy_files/dummy_current_stack

@@ -0,0 +1,4 @@
+HDFS    HDP-1.2.0
+NAGIOS    HDP-1.2.1
+HBASE    HDP-1.3.0
+GANGLIA  HDP-1.2.2