TestShell.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. Licensed to the Apache Software Foundation (ASF) under one
  5. or more contributor license agreements. See the NOTICE file
  6. distributed with this work for additional information
  7. regarding copyright ownership. The ASF licenses this file
  8. to you under the Apache License, Version 2.0 (the
  9. "License"); you may not use this file except in compliance
  10. with the License. You may obtain a copy of the License at
  11. http://www.apache.org/licenses/LICENSE-2.0
  12. Unless required by applicable law or agreed to in writing, software
  13. distributed under the License is distributed on an "AS IS" BASIS,
  14. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. See the License for the specific language governing permissions and
  16. limitations under the License.
  17. '''
  18. import os
  19. import unittest
  20. import tempfile
  21. from mock.mock import patch, MagicMock, call
  22. from ambari_agent.AmbariConfig import AmbariConfig
  23. from ambari_agent import shell
  24. from shell import shellRunner
  25. from sys import platform as _platform
  26. from only_for_platform import only_for_platform, PLATFORM_LINUX
  27. import subprocess, time
  28. @only_for_platform(PLATFORM_LINUX)
  29. class TestShell(unittest.TestCase):
  30. @patch("os.setuid")
  31. def test_changeUid(self, os_setUIDMock):
  32. shell.threadLocal.uid = 9999
  33. shell.changeUid()
  34. self.assertTrue(os_setUIDMock.called)
  35. @patch("pwd.getpwnam")
  36. def test_shellRunner_run(self, getpwnamMock):
  37. sh = shellRunner()
  38. result = sh.run(['echo'])
  39. self.assertEquals(result['exitCode'], 0)
  40. self.assertEquals(result['error'], '')
  41. getpwnamMock.return_value = [os.getuid(), os.getuid(), os.getuid()]
  42. result = sh.run(['echo'], 'non_exist_user_name')
  43. self.assertEquals(result['exitCode'], 0)
  44. self.assertEquals(result['error'], '')
  45. def test_kill_process_with_children(self):
  46. if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
  47. gracefull_kill_delay_old = shell.gracefull_kill_delay
  48. shell.gracefull_kill_delay = 0.1
  49. sleep_cmd = "sleep 314159265"
  50. test_cmd = """ (({0}) & ({0} & {0})) """.format(sleep_cmd)
  51. # Starting process tree (multiple process groups)
  52. test_process = subprocess.Popen(test_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
  53. time.sleep(0.3) # Delay to allow subprocess to start
  54. # Check if processes are running
  55. ps_cmd = """ps aux """
  56. ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
  57. (out, err) = ps_process.communicate()
  58. self.assertTrue(sleep_cmd in out)
  59. # Kill test process
  60. shell.kill_process_with_children(test_process.pid)
  61. test_process.communicate()
  62. # Now test process should not be running
  63. ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
  64. (out, err) = ps_process.communicate()
  65. self.assertFalse(sleep_cmd in out)
  66. shell.gracefull_kill_delay = gracefull_kill_delay_old
  67. else:
  68. # Do not run under other systems
  69. pass