瀏覽代碼

AMBARI-892. Add puppet executor at the agent to be able to run various commands from the server. (mahadev)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1400792 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 12 年之前
父節點
當前提交
6623689678
共有 54 個文件被更改,包括 1635 次插入645 次删除
  1. 3 0
      AMBARI-666-CHANGES.txt
  2. 5 2
      ambari-agent/src/main/puppet/modules/configgenerator/manifests/configfile.pp
  3. 29 25
      ambari-agent/src/main/puppet/modules/hdp-hadoop/manifests/init.pp
  4. 191 0
      ambari-agent/src/main/puppet/modules/hdp/manifests/lib/puppet/parser/functions/pkgName.rb
  5. 11 4
      ambari-agent/src/main/python/ambari_agent/ActionQueue.py
  6. 1 0
      ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
  7. 42 18
      ambari-agent/src/main/python/ambari_agent/manifestGenerator.py
  8. 114 0
      ambari-agent/src/main/python/ambari_agent/puppetExecutor.py
  9. 40 6
      ambari-agent/src/main/python/ambari_agent/site.pp
  10. 16 4
      ambari-agent/src/main/python/ambari_agent/test.json
  11. 37 0
      ambari-agent/src/test/python/TestPuppetExecutor.py
  12. 1 0
      ambari-server/src/main/conf/ambari.properties
  13. 135 17
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
  14. 117 14
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
  15. 25 0
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommandFactory.java
  16. 87 2
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
  17. 28 0
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java
  18. 12 1
      ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java
  19. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/agent/rest/AgentResource.java
  20. 117 32
      ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
  21. 11 0
      ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
  22. 4 3
      ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
  23. 0 61
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ActionStatusDAO.java
  24. 60 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/DaoUtils.java
  25. 19 2
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostDAO.java
  26. 36 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
  27. 39 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
  28. 0 209
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionStatusEntity.java
  29. 0 110
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionStatusEntityPK.java
  30. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java
  31. 6 7
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java
  32. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentMappingEntity.java
  33. 17 5
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostEntity.java
  34. 25 20
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java
  35. 12 9
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java
  36. 15 10
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntityPK.java
  37. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceComponentConfigEntity.java
  38. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceComponentHostConfigEntity.java
  39. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java
  40. 9 10
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
  41. 6 6
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java
  42. 2 2
      ambari-server/src/main/java/org/apache/ambari/server/security/CertificateManager.java
  43. 30 0
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
  44. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java
  45. 8 0
      ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java
  46. 3 3
      ambari-server/src/main/python/ambari-server-state/core-site.xml
  47. 23 22
      ambari-server/src/main/resources/Ambari-DDL.sql
  48. 1 2
      ambari-server/src/main/resources/META-INF/persistence.xml
  49. 4 0
      ambari-server/src/main/resources/db/newcerts/.gitignore
  50. 12 0
      ambari-server/src/main/resources/stacks/HDP/0.1/repos/repoinfo.xml
  51. 6 0
      ambari-server/src/main/resources/stacks/HDP/0.1/services/HDFS/metainfo.xml
  52. 144 0
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
  53. 50 12
      ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
  54. 68 18
      ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java

+ 3 - 0
AMBARI-666-CHANGES.txt

@@ -12,6 +12,9 @@ AMBARI-666 branch (unreleased changes)
 
   NEW FEATURES
 
+  AMBARI-892. Add puppet executor at the agent to be able to run various
+  commands from the server. (mahadev)
+
   AMBARI-887. Ability to save configuration. (Nate Cole via mahadev)
 
   AMBARI-877. Refactor resource provider implementation for changes to

+ 5 - 2
ambari-agent/src/main/puppet/modules/configgenerator/manifests/configfile.pp

@@ -53,9 +53,12 @@ define configgenerator::configfile ($modulespath='/etc/puppet/modules', $filenam
   <% end -%>
 </configuration>')
  
-file {'config':
+
+debug("Generating config: ${modulespath}/${filename}")
+
+file {"${modulespath}/${filename}":
   ensure  => present,
   content => $configcontent,
-  path => "${modulespath}/${module}/templates/${filename}",
+  path => "${modulespath}/${filename}",
 }
 } 

+ 29 - 25
ambari-agent/src/main/puppet/modules/hdp-hadoop/manifests/init.pp

@@ -40,51 +40,60 @@ class hdp-hadoop::initialize()
 
 # Configs generation  
 
-  if has_key($configuration, 'hdp_hadoop__mapred_queue_acls') {
-    configgenerator::configfile{'mapred_queue_acls_xml': 
+debug('##Configs generation for hdp-hadoop')
+
+
+  if has_key($configuration, 'mapred_queue_acls') {
+    configgenerator::configfile{'mapred_queue_acls': 
+      modulespath => $hdp-hadoop::params::conf_dir,
       filename => 'mapred-queue-acls.xml',
       module => 'hdp-hadoop',
-      configuration => $configuration['hdp_hadoop__mapred_queue_acls']
+      configuration => $configuration['mapred_queue_acls']
     }
   }
   
-  if has_key($configuration, 'hdp_hadoop__hadoop_policy') {
-    configgenerator::configfile{'hadoop_policy_xml': 
+  if has_key($configuration, 'hadoop_policy') {
+    configgenerator::configfile{'hadoop_policy': 
+      modulespath => $hdp-hadoop::params::conf_dir,
       filename => 'hadoop-policy.xml',
       module => 'hdp-hadoop',
-      configuration => $configuration['hdp_hadoop__hadoop_policy']
+      configuration => $configuration['hadoop_policy']
     }
   }
-  
-  if has_key($configuration, 'hdp_hadoop__core_site') {
-      configgenerator::configfile{'core_site_xml': 
+
+  if has_key($configuration, 'core_site') {
+      configgenerator::configfile{'core_site': 
+        modulespath => $hdp-hadoop::params::conf_dir,
         filename => 'core-site.xml',
         module => 'hdp-hadoop',
-        configuration => $configuration['hdp_hadoop__core_site']
+        configuration => $configuration['core_site']
       }
     }
 
-  if has_key($configuration, 'hdp_hadoop__mapred_site') {
-    configgenerator::configfile{'mapred_site_xml': 
+  if has_key($configuration, 'mapred_site') {
+    configgenerator::configfile{'mapred_site': 
+      modulespath => $hdp-hadoop::params::conf_dir,
       filename => 'mapred-site.xml',
       module => 'hdp-hadoop',
-      configuration => $configuration['hdp_hadoop__mapred_site']
+      configuration => $configuration['mapred_site']
     }
   }
   
-  if has_key($configuration, 'hdp_hadoop__capacity_scheduler') {
-    configgenerator::configfile{'capacity_scheduler_xml': 
+  if has_key($configuration, 'capacity_scheduler') {
+    configgenerator::configfile{'capacity_scheduler': 
+      modulespath => $hdp-hadoop::params::conf_dir,
       filename => 'capacity-scheduler.xml',
       module => 'hdp-hadoop',
-      configuration => $configuration['hdp_hadoop__capacity_scheduler']
+      configuration => $configuration['capacity_scheduler']
     }
   }
 
-  if has_key($configuration, 'hdp_hadoop__hdfs_site') {
-    configgenerator::configfile{'hdfs_site_xml': 
+  if has_key($configuration, 'hdfs_site') {
+    configgenerator::configfile{'hdfs_site': 
+      modulespath => $hdp-hadoop::params::conf_dir,
       filename => 'hdfs-site.xml',
       module => 'hdp-hadoop',
-      configuration => $configuration['hdp_hadoop__hdfs_site']
+      configuration => $configuration['hdfs_site']
     }
   }
 }
@@ -155,7 +164,7 @@ class hdp-hadoop(
       mode  => $tc_mode
     }
 
-    $template_files = ['hadoop-env.sh','core-site.xml','hadoop-policy.xml','health_check','capacity-scheduler.xml','commons-logging.properties','log4j.properties','mapred-queue-acls.xml','slaves']
+    $template_files = [ 'hadoop-env.sh', 'health_check', 'commons-logging.properties', 'log4j.properties', 'slaves']
     hdp-hadoop::configfile { $template_files:
       tag   => 'common', 
       owner => $hdfs_user
@@ -166,11 +175,6 @@ class hdp-hadoop(
       owner => $hdfs_user,
     }
 
-    hdp-hadoop::configfile { 'mapred-site.xml': 
-      tag => 'common', 
-      owner => $mapred_user
-    }
-
     Anchor['hdp-hadoop::begin'] -> Hdp-hadoop::Package<||> ->  Hdp::Directory_recursive_create[$hadoop_config_dir] ->  Hdp::User<|title == $hdfs_user or title == $mapred_user|> 
     -> Hdp-hadoop::Configfile<|tag == 'common'|> -> Anchor['hdp-hadoop::end']
     Anchor['hdp-hadoop::begin'] -> Hdp::Directory_recursive_create[$logdirprefix] -> Anchor['hdp-hadoop::end']

+ 191 - 0
ambari-agent/src/main/puppet/modules/hdp/manifests/lib/puppet/parser/functions/pkgName.rb

@@ -0,0 +1,191 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+# Returns package name(s) for the specific OS and architecture
+# Params:
+#
+# - name of the package
+# - OS name
+# - OS architecture
+#
+# If there are no approprite OS/architecture, it will search default entries (named as ALL)
+module Puppet::Parser::Functions
+  newfunction(:pkgName, :type => :rvalue) do |args|
+    packageName = args[0]
+    os = args[1]
+    arch = args[2]
+    ALL = 'ALL'
+
+    # First level (packages): packageName => OS hashmap
+    # Second level (OS hashmap): Architecture => real package name(s)
+    packages = {
+      'ganglia-monitor' => {
+        'ALL' => {
+          64 => 'ganglia-gmond-3.2.0'
+        }
+      },
+      'ganglia-server' => {
+        'ALL' => {
+          64 => 'ganglia-gmetad-3.2.0'
+        }
+      },
+      'ganglia-gweb' => {
+        'ALL' => {
+          64 => 'gweb'
+        }
+      },
+      'ganglia-hdp-gweb-addons' => {
+        ALL => {
+          64 => 'hdp_mon_ganglia_addons'
+        }
+      },
+      'glibc' => {
+        'rhel6' => {
+          ALL => ['glibc','glibc.i686']
+        }
+      },
+      'nagios-addons' => {
+        ALL => {
+          64 => 'hdp_mon_nagios_addons'
+        }
+      },
+      'nagios-server' => {
+        ALL => {
+          64 => 'nagios-3.2.3'
+        }
+      },
+      'nagios-plugins' => {
+        ALL => {
+          64 => 'nagios-plugins-1.4.9'
+        }
+      },
+      'nagios-fping' => {
+        ALL => {
+          64 =>'fping'
+        }
+      },
+      'nagios-php-pecl-json' => {
+        ALL => {
+          64 => 'php-pecl-json.x86_64'
+        }
+      },
+      'snmp' => {
+        ALL => {
+          64 => ['net-snmp','net-snmp-utils'],
+        }
+      },
+      'dashboard' => {
+        ALL => {
+          64 => 'hdp_mon_dashboard'
+        }
+      },
+      'templeton' => {
+        ALL => {
+          ALL => 'templeton'
+        }
+      },
+      'oozie-client' => {
+        ALL => {
+          64 => 'oozie-client.noarch'
+        }
+      },
+      'oozie-server' => {
+        ALL => {
+          64 => 'oozie.noarch'
+        }
+      },
+      'lzo' => {
+        'rhel5' => {
+          ALL => ['lzo','lzo.i386','lzo-devel','lzo-devel.i386']
+        },
+        'rhel6' => {
+          ALL => ['lzo','lzo.i686','lzo-devel','lzo-devel.i686']
+        }
+      },
+      #TODO: make these two consistent on whether case of 64/32 bits
+      'snappy' => {
+        ALL => {
+          32 =>  ['snappy','snappy-devel'],
+          64 => ['snappy','snappy-devel']
+        }
+      },
+      'mysql' => {
+        ALL => {
+          32 =>  ['mysql','mysql-server']
+        }
+      },
+      'mysql-connector' => {
+        ALL => {
+          64 =>  ['mysql-connector-java']
+        }
+      },
+      'extjs' => {
+        ALL => {
+          64 =>  ['extjs-2.2-1']
+        }
+      },
+      'templeton-tar-hive' => {
+        ALL => {
+          64 => ['templeton-tar-hive-0.0.1.14-1']
+        }
+      },
+      'templeton-tar-pig' => {
+        ALL => {
+          64 => ['templeton-tar-pig-0.0.1.14-1']
+        }
+      }
+    }
+    ########################################################
+    ########################################################
+    # seeking package hashmap
+    pkgHash = nil
+    
+    if has_key(packages, packageName) 
+      pkgHash = packages[packageName]
+    else 
+      print "Wrong package name: " + packageName
+      return nil
+    end
+    
+    # seeking os hashmap
+    osHash = nil
+    
+    if has_key(pkgHash, os) 
+      osHash = pkgHash[os]
+    elsif has_key(pkgHash, ALL) 
+      osHash = pkgHash[ALL]
+    else 
+      print "Wrong package name: " + packageName
+      return nil
+    end
+    
+    #seeking arhitecture 
+    result = nil
+    
+    if has_key(osHash, arch) 
+      result = osHash[arch]
+    elsif has_key(osHash, ALL)
+      result = osHash[ALL]
+    end
+    
+    return result
+  end
+end

+ 11 - 4
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -33,11 +33,14 @@ import os
 import time
 import subprocess
 import copy
+import puppetExecutor
 
 logger = logging.getLogger()
 installScriptHash = -1
 
 class ActionQueue(threading.Thread):
+  """ Action Queue for the agent. We pick one command at a time from the queue
+  and execute that """
   global commandQueue, resultQueue
   commandQueue = Queue.Queue()
   resultQueue = Queue.Queue()
@@ -50,7 +53,10 @@ class ActionQueue(threading.Thread):
     self._stop = threading.Event()
     self.maxRetries = config.getint('command', 'maxretries') 
     self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
-
+    self.executor = puppetExecutor.puppetExecutor(config.get('puppet', 'puppetmodules'),
+                                   config.get('puppet', 'puppet_home'),
+                                   config.get('puppet', 'facter_home'),
+                                   config.get('agent', 'prefix'))
   def stop(self):
     self._stop.set()
 
@@ -112,13 +118,14 @@ class ActionQueue(threading.Thread):
     serviceName = command['serviceName']
     configurations = command['configurations']
     result = []
+    commandresult = self.executor.runCommand(command)
     # assume some puppet pluing to run these commands
     roleResult = {'role' : command['role'],
                   'actionId' : commandId,
-                  'stdout' : "DONE",
+                  'stdout' : commandresult['stdout'],
                   'clusterName' : clusterName,
-                  'stderr' : "DONE",
-                  'exitCode' : 0,
+                  'stderr' : commandresult['stderr'],
+                  'exitCode' : commandresult['exitcode'],
                   'serviceName' : serviceName,
                   'status' : "COMPLETED"}
     result.append(roleResult)

+ 1 - 0
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -37,6 +37,7 @@ prefix=/tmp/ambari
 installprefix=/var/ambari/
 
 [puppet]
+puppetmodules=/var/lib/ambari-agent/puppet/
 puppet_home=/root/workspace/puppet-install/puppet-2.7.9
 facter_home=/root/workspace/puppet-install/facter-1.6.10
 

+ 42 - 18
ambari-agent/src/main/python/ambari_agent/manifestGenerator.py

@@ -24,21 +24,19 @@ import logging
 
 logger = logging.getLogger()
 
-  #read static imports from file and write them to manifest
-def writeImports(outputFile, inputFileName='imports.txt'):
+#read static imports from file and write them to manifest
+def writeImports(outputFile, modulesdir, inputFileName='imports.txt'):
   inputFile = open(inputFileName, 'r')
-  modulesdir = os.path.abspath(os.getcwd() + "../../../puppet/modules/")
   logger.info("Modules dir is " + modulesdir)
   for line in inputFile:
-    modulename = line.rstrip('\n')
-    line = "import '" + modulesdir + "/" + modulename + "'\n"
+    modulename = line.rstrip()
+    line = "import '" + modulesdir + os.sep + modulename + "'" + os.linesep
     outputFile.write(line)
     
   inputFile.close()
 
-def generateManifest(inputJsonStr):
+def generateManifest(parsedJson, fileName, modulesdir):
 #reading json
-  parsedJson = json.loads(inputJsonStr)
   hostname = parsedJson['hostname']
   clusterHostInfo = parsedJson['clusterHostInfo']
   params = parsedJson['params']
@@ -46,11 +44,11 @@ def generateManifest(inputJsonStr):
   #hostAttributes = parsedJson['hostAttributes']
   roles = parsedJson['roleCommands']
   
-#writing manifest
-  manifest = open('site.pp', 'w')
+  #writing manifest
+  manifest = open(fileName, 'w')
 
   #writing imports from external static file
-  writeImports(manifest)
+  writeImports(outputFile=manifest, modulesdir=modulesdir)
   
   #writing nodes
   writeNodes(manifest, clusterHostInfo)
@@ -94,9 +92,25 @@ def writeNodes(outputFile, clusterHostInfo):
     outputFile.write(']\n')
 
 #write params
-def writeParams(outputFile, params):
-  for param in params.iterkeys():
-    outputFile.write('$' +  param + '="' + params[param] + '"\n')
+def writeParams(outputFile, params):
+
+  for paramName in params.iterkeys():
+
+    param = params[paramName]
+    if type(param) is dict:
+
+      outputFile.write('$' + paramName + '= {\n')
+
+      coma = ''
+
+      for subParam in param.iterkeys():
+        outputFile.write(coma + '"' + subParam + '" => "' + param[subParam] + '"')
+        coma = ',\n'
+
+      outputFile.write('\n}\n')
+    else:
+      outputFile.write('$' +  paramName + '="' + param + '"\n')
+    
 
 #write host attributes
 def writeHostAttributes(outputFile, hostAttributes):
@@ -177,8 +191,18 @@ def writeStages(outputFile, numStages):
   
   outputFile.write('\n')
     
-logging.basicConfig(level=logging.DEBUG)    
-#test code
-jsonFile = open('test.json', 'r')
-jsonStr = jsonFile.read() 
-generateManifest(jsonStr)
+
+def main():
+  logging.basicConfig(level=logging.DEBUG)    
+  #test code
+  jsonFile = open('test.json', 'r')
+  jsonStr = jsonFile.read() 
+  modulesdir = os.path.abspath(os.getcwd() + ".." + os.sep + ".." + 
+                               os.sep + ".." + os.sep + "puppet" + 
+                               os.sep + "modules" + os.sep)
+  parsedJson = json.loads(inputJsonStr)
+  generateManifest(parsedJson, 'site.pp', modulesdir)
+
+if __name__ == '__main__':
+  main()
+

+ 114 - 0
ambari-agent/src/main/python/ambari_agent/puppetExecutor.py

@@ -0,0 +1,114 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import json
+import os.path
+import logging
+import subprocess
+from manifestGenerator import generateManifest
+import pprint
+
+logger = logging.getLogger()
+
+class puppetExecutor:
+  """ Class that executes the commands that come from the server using puppet.
+  This is the class that provides the pluggable point for executing the puppet"""
+  
+  def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir):
+    self.puppetModule = puppetModule
+    self.puppetInstall = puppetInstall
+    self.facterInstall = facterInstall
+    self.tmpDir = tmpDir
+
+  def getPuppetBinary(self):
+    return os.path.join(self.puppetInstall, "bin", "puppet") 
+     
+  def puppetCommand(self, sitepp):
+    modules = self.puppetModule
+    puppetcommand = [];
+    puppetcommand.append(self.getPuppetBinary())
+    puppetcommand.append("apply")
+    puppetcommand.append("--confdir=" + modules)
+    puppetcommand.append("--detailed-exitcodes")
+    puppetcommand.append(sitepp)
+    return puppetcommand
+  
+  def facterLib(self):
+    return self.facterInstall + "/lib/"
+    pass
+  
+  def puppetLib(self):
+    return self.puppetInstall + "/lib"
+    pass
+      
+  def runCommand(self, command):
+    result = {}
+    taskId = "0";
+    if command.has_key("taskId"):
+      taskId = command['taskId']
+      
+    puppetEnv = os.environ
+    siteppFileName = os.path.join(self.tmpDir, "site-" + taskId + ".pp") 
+    generateManifest(command, siteppFileName, self.puppetModule + "/modules")
+    puppetcommand = self.puppetCommand(siteppFileName)
+    """ Run the command and make sure the output gets propagated"""
+    rubyLib = ""
+    if os.environ.has_key("RUBYLIB"):
+      rubyLib = os.environ["RUBYLIB"]
+      logger.info("Ruby Lib env from Env " + rubyLib)
+    rubyLib = rubyLib + ":" + self.facterLib() + ":" + self.puppetLib()
+    puppetEnv["RUBYLIB"] = rubyLib
+    logger.info("Setting RUBYLIB as: " + rubyLib)
+    logger.info("Running command " + pprint.pformat(puppetcommand))
+    puppet = subprocess.Popen(puppetcommand,
+                                  stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE,
+                                  env=puppetEnv)
+    stderr_out = puppet.communicate()
+    error = "none"
+    if puppet.returncode != 0:
+      error = stderr_out[1]
+      result["stderr"] = error
+      logging.error("Error running puppet: " + stderr_out[1])
+      pass
+    puppetOutput = stderr_out[0]
+    result["exitcode"] = puppet.returncode
+    result["stdout"] = puppetOutput
+    logger.info("ExitCode : \n"  + str(result["exitcode"]))
+    return result
+ 
+def main():
+  logging.basicConfig(level=logging.DEBUG)    
+  #test code
+  jsonFile = open('test.json', 'r')
+  jsonStr = jsonFile.read() 
+  # Below is for testing only.
+  
+  puppetInstance = puppetExecutor("/root/workspace/ambari-workspace/ambari-git/ambari-agent/src/main/puppet/",
+                                  "/root/workspace/puppet-install/puppet-2.7.9",
+                                  "/root/workspace/puppet-install/facter-1.6.10/",
+                                  "/tmp")
+  jsonFile = open('test.json', 'r')
+  jsonStr = jsonFile.read() 
+  parsedJson = json.loads(jsonStr)
+  puppetInstance.runCommand(parsedJson)
+  
+if __name__ == '__main__':
+  main()
+

+ 40 - 6
ambari-agent/src/main/python/ambari_agent/site.pp

@@ -1,19 +1,53 @@
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-hadoop/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-hbase/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-zookeeper/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-oozie/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-pig/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-sqoop/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-templeton/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-hive/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-hcat/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-mysql/manifests/*.pp'
+import '/home/centos/ambari/ambari-agent/src/main/puppet/modules/hdp-monitor-webserver/manifests/*.pp'
 $NAMENODE= ['h2.hortonworks.com']
 $DATANODE= ['h1.hortonworks.com', 'h2.hortonworks.com']
 $hdfs_user="hdfs"
-$jdk_location="lah/blah"
+$jdk_location="http://hdp1/downloads"
+$jdk_bins= {
+"32" => "jdk-6u31-linux-x64.bin",
+"64" => "jdk-6u31-linux-x64.bin"
+}
+$java32_home="/usr/jdk64/jdk1.6.0_31"
+$java64_home="/usr/jdk64/jdk1.6.0_31"
 $configuration =  {
+capacity_scheduler=> {
+"mapred.capacity-scheduler.queue.default.capacity" => "100",
+"mapred.capacity-scheduler.queue.default.supports-priorit" => "false"
+},
+core_site=> {
+"fs.default.name" => "hrt8n36.cc1.ygridcore.net"
+},
+hadoop_policy=> {
+"security.client.datanode.protocol.acl" => "*",
+"security.client.protocol.acl" => "*"
+},
+mapred_queue_acls=> {
+"mapred.queue.default.acl-submit-job" => "*",
+"mapred.queue.default.acl-administer-jobs" => "*"
+},
 hdfs_site=> {
 "dfs.block.size" => "256000000",
 "dfs.replication" => "1"
 },
-core_site=> {
-"fs.default.name" => "hrt8n36.cc1.ygridcore.net"
+mapred_site=> {
+"mapred.queue.names" => "hive,pig,default",
+"mapred.jobtracker.taskScheduler" => "org.apache.hadoop.mapred.CapacityTaskScheduler"
 },
 
 }
 node /default/ {
- stage{0 :} -> stage{1 :}
-class {'hdp-hadoop::datanode': stage => 1, service_state => running}
-class {'hdp-hadoop::namenode': stage => 2, service_state => installed_and_configured}
+ stage{1 :} -> stage{2 :}
+class {'hdp-hadoop::namenode': stage => 1, service_state => running}
+class {'hdp-hadoop::datanode': stage => 2, service_state => installed_and_configured}
 }

+ 16 - 4
ambari-agent/src/main/python/ambari_agent/test.json

@@ -10,21 +10,33 @@
 "params": 
 {
 "hdfs_user" : "hdfs",
-"jdk_location": "lah/blah"
+"jdk_location" : "http://hdp1/downloads",
+
+"java32_home" : "/usr/jdk64/jdk1.6.0_31",
+"java64_home" : "/usr/jdk64/jdk1.6.0_31",
+"jdk_bins" :  { "32" : "jdk-6u31-linux-x64.bin", "64" : "jdk-6u31-linux-x64.bin" }
 },
 "configurations" : {
 "hdfs_site" : { "dfs.block.size" : "256000000", "dfs.replication" : "1" } ,
-"core_site" :  { "fs.default.name" : "hrt8n36.cc1.ygridcore.net" }
+"core_site" :  { "fs.default.name" : "hrt8n36.cc1.ygridcore.net" } ,
+"mapred_queue_acls" : {"mapred.queue.default.acl-submit-job" : "*",
+		       "mapred.queue.default.acl-administer-jobs" : "*"},
+"hadoop_policy" : {"security.client.protocol.acl" : "*",
+		   "security.client.datanode.protocol.acl" : "*"},
+"mapred_site" : {"mapred.jobtracker.taskScheduler" : "org.apache.hadoop.mapred.CapacityTaskScheduler",
+		 "mapred.queue.names" : "hive,pig,default"},
+"capacity_scheduler" : {"mapred.capacity-scheduler.queue.default.capacity" : "100",
+			"mapred.capacity-scheduler.queue.default.supports-priorit" : "false"}
 },
 "roleCommands": [
 {
-"role" : "DATANODE", 
+"role" : "NAMENODE", 
 "cmd": "START",
 "roleParams" : {
 }
 },
 {
-"role": "NAMENODE",
+"role": "DATANODE",
 "cmd": "INSTALL",
 "roleParams" : {}
 }

+ 37 - 0
ambari-agent/src/test/python/TestPuppetExecutor.py

@@ -0,0 +1,37 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from puppetExecutor import puppetExecutor
+from pprint import pformat
+import sys
+
+class TestPuppetExecutor(TestCase):
+  def test_build(self):
+    puppetexecutor = puppetExecutor("/tmp", "/x", "/y", "/z")
+    command = puppetexecutor.puppetCommand("site.pp")
+    self.assertEquals("/x/bin/puppet", command[0], "puppet binary wrong")
+    self.assertEquals("apply", command[1], "local apply called")
+    self.assertEquals("--confdir=/tmp", command[2],"conf dir tmp")
+    self.assertEquals("--detailed-exitcodes", command[3], "make sure output \
+    correct")
+    
+    
+    

+ 1 - 0
ambari-server/src/main/conf/ambari.properties

@@ -0,0 +1 @@
+metadata.path=src/main/resources/stacks

+ 135 - 17
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java

@@ -17,18 +17,55 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.Transactional;
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.CommandReport;
 
 import com.google.inject.Singleton;
+import org.apache.ambari.server.orm.dao.*;
+import org.apache.ambari.server.orm.entities.*;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Singleton
 public class ActionDBAccessorImpl implements ActionDBAccessor {
+  private static final Logger log = LoggerFactory.getLogger(ActionDBAccessorImpl.class);
+
+  @Inject
+  private ClusterDAO clusterDAO;
+  @Inject
+  private HostDAO hostDAO;
+  @Inject
+  private StageDAO stageDAO;
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+  @Inject
+  private ExecutionCommandDAO executionCommandDAO;
+  @Inject
+  private RoleSuccessCriteriaDAO roleSuccessCriteriaDAO;
+  @Inject
+  private StageFactory stageFactory;
+  @Inject
+  private Clusters clusters;
+
+  private final long requestId;
+
+  @Inject
+  public ActionDBAccessorImpl(Injector injector) {
+    injector.injectMembers(this);
+    requestId = stageDAO.getLastRequestId();
+
 
-  public ActionDBAccessorImpl() {
-    //this.stageId = greatest stage id in the database + 1
   }
 
   /* (non-Javadoc)
@@ -36,7 +73,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public Stage getAction(String actionId) {
-    return null;
+    return stageFactory.createExisting(actionId);
   }
 
   /* (non-Javadoc)
@@ -44,7 +81,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public List<Stage> getAllStages(long requestId) {
-    return null;
+    List<Stage> stages = new ArrayList<Stage>();
+    for (StageEntity stageEntity : stageDAO.findByRequestId(requestId)) {
+      stages.add(stageFactory.createExisting(stageEntity));
+    }
+    return stages;
   }
 
   /* (non-Javadoc)
@@ -52,15 +93,22 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public void abortOperation(long requestId) {
-    //Mark all pending or queued actions for this request as aborted.
+    Collection<HostRoleStatus> sourceStatuses = Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING);
+    int result = hostRoleCommandDAO.updateStatusByRequestId(requestId, HostRoleStatus.ABORTED, sourceStatuses);
+    log.info("Aborted {} commands", result);
   }
 
   /* (non-Javadoc)
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#timeoutHostRole(long, long, org.apache.ambari.server.Role)
    */
   @Override
+  @Transactional
   public void timeoutHostRole(String host, long requestId, long stageId, Role role) {
-    // TODO Auto-generated method stub
+    List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
+    for (HostRoleCommandEntity command : commands) {
+      command.setStatus(HostRoleStatus.TIMEDOUT);
+      hostRoleCommandDAO.merge(command);
+    }
   }
 
   /* (non-Javadoc)
@@ -68,40 +116,110 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public List<Stage> getStagesInProgress() {
-    return null;
+    List<Stage> stages = new ArrayList<Stage>();
+    for (StageEntity stageEntity : stageDAO.findByCommandStatuses(Arrays.asList(HostRoleStatus.IN_PROGRESS))) {
+      stages.add(stageFactory.createExisting(stageEntity));
+    }
+    return stages;
   }
 
   /* (non-Javadoc)
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#persistActions(java.util.List)
    */
   @Override
+  @Transactional
   public void persistActions(List<Stage> stages) {
-    // TODO Auto-generated method stub
-
+    for (Stage stage : stages) {
+      StageEntity stageEntity = stage.constructNewPersistenceEntity();
+      Cluster cluster;
+      try {
+        cluster = clusters.getCluster(stage.getClusterName());
+      } catch (AmbariException e) {
+        throw new RuntimeException(e);
+      }
+      ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
+
+      stageEntity.setCluster(clusterEntity);
+      clusterEntity.getStages().add(stageEntity);
+
+      for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
+        HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity();
+        stageEntity.getHostRoleCommands().add(hostRoleCommandEntity);
+        hostRoleCommandEntity.setStage(stageEntity);
+
+        HostEntity hostEntity = hostDAO.findByName(hostRoleCommandEntity.getHostName());
+        if (hostEntity == null) {
+          log.error("Host {} doesn't exists in database", hostRoleCommandEntity.getHostName());
+          throw new RuntimeException("Host '"+hostRoleCommandEntity.getHostName()+"' doesn't exists in database");
+        }
+        hostEntity.getHostRoleCommandEntities().add(hostRoleCommandEntity);
+        hostRoleCommandEntity.setHost(hostEntity);
+        hostRoleCommandDAO.create(hostRoleCommandEntity);
+
+        assert hostRoleCommandEntity.getTaskId() != null;
+
+        hostRoleCommand.setTaskId(hostRoleCommandEntity.getTaskId());
+        ExecutionCommandEntity executionCommandEntity = hostRoleCommand.constructExecutionCommandEntity();
+        executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
+        hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
+
+        executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand());
+        hostRoleCommandDAO.merge(hostRoleCommandEntity);
+        hostDAO.merge(hostEntity);
+      }
+
+      for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
+        roleSuccessCriteriaDAO.create(roleSuccessCriteriaEntity);
+      }
+
+      stageDAO.create(stageEntity);
+      clusterDAO.merge(clusterEntity);
+    }
   }
 
   @Override
+  @Transactional
   public void updateHostRoleState(String hostname, long requestId,
       long stageId, String role, CommandReport report) {
-    // TODO Auto-generated method stub
-
+    List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(hostname, requestId, stageId, Role.valueOf(role));
+    for (HostRoleCommandEntity command : commands) {
+      command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
+      command.setStdOut(report.getStdOut());
+      command.setStdError(report.getStdErr());
+      command.setExitcode(report.getExitCode());
+      hostRoleCommandDAO.merge(command);
+    }
   }
 
   @Override
   public void abortHostRole(String host, long requestId, long stageId, Role role) {
-    // TODO Auto-generated method stub
-
+    CommandReport report = new CommandReport();
+    report.setExitCode(999);
+    report.setStdErr("Host Role in invalid state");
+    report.setStdOut("");
+    report.setStatus("ABORTED");
+    updateHostRoleState(host, requestId, stageId, role.toString(), report);
   }
 
   @Override
   public long getLastPersistedRequestIdWhenInitialized() {
-    // TODO Auto-generated method stub
-    return 0;
+    return requestId;
   }
 
   @Override
+  @Transactional
   public void hostRoleScheduled(Stage s, String hostname, String roleStr) {
-    //Update start time, last update time, host role status, number of attempts
-    //in the database.
+    HostRoleCommand hostRoleCommand = s.getHostRoleCommand(hostname, roleStr);
+    HostRoleCommandEntity entity = hostRoleCommandDAO.findByPK(hostRoleCommand.getTaskId());
+    if (entity != null) {
+      entity.setStartTime(hostRoleCommand.getStartTime());
+      entity.setLastAttemptTime(hostRoleCommand.getLastAttemptTime());
+      entity.setStatus(hostRoleCommand.getStatus());
+      entity.setAttemptCount(hostRoleCommand.getAttemptCount());
+      hostRoleCommandDAO.merge(entity);
+    } else {
+      throw new RuntimeException("HostRoleCommand is not persisted, cannot update:\n" + hostRoleCommand);
+    }
+
   }
 }

+ 117 - 14
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java

@@ -17,8 +17,20 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
 import org.apache.ambari.server.Role;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
 
 /**
  * This class encapsulates the information for an task on a host for a
@@ -27,7 +39,11 @@ import org.apache.ambari.server.state.ServiceComponentHostEvent;
  * track the request.
  * For the actual command refer {@link HostAction#commandToHost}
  */
-class HostRoleCommand {
+public class HostRoleCommand {
+  private static final Logger log = LoggerFactory.getLogger(HostRoleCommand.class);
+
+  private int taskId = -1;
+  private String hostName;
   private final Role role;
   private HostRoleStatus status = HostRoleStatus.PENDING;
   private String stdout = "";
@@ -38,12 +54,91 @@ class HostRoleCommand {
   private long lastAttemptTime = -1;
   private short attemptCount = 0;
 
+  private ExecutionCommand executionCommand;
+
   public HostRoleCommand(String host, Role role,
-      ServiceComponentHostEvent event) {
+                         ServiceComponentHostEvent event) {
+    this.hostName = host;
     this.role = role;
     this.event = event;
   }
 
+  @AssistedInject
+  public HostRoleCommand(@Assisted HostRoleCommandEntity hostRoleCommandEntity, Injector injector) {
+    taskId = hostRoleCommandEntity.getTaskId();
+    this.hostName = hostRoleCommandEntity.getHostName();
+    role = hostRoleCommandEntity.getRole();
+    status = hostRoleCommandEntity.getStatus();
+    stdout = hostRoleCommandEntity.getStdOut();
+    stderr = hostRoleCommandEntity.getStdError();
+    exitCode = hostRoleCommandEntity.getExitcode();
+    startTime = hostRoleCommandEntity.getStartTime();
+    lastAttemptTime = hostRoleCommandEntity.getLastAttemptTime();
+    attemptCount = hostRoleCommandEntity.getAttemptCount();
+
+    try {
+      log.info(hostRoleCommandEntity.getEvent());
+      event = StageUtils.fromJson(hostRoleCommandEntity.getEvent(), ServiceComponentHostEvent.class);
+      executionCommand = StageUtils.stringToExecutionCommand(hostRoleCommandEntity.getExecutionCommand().getCommand());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to parse JSON string", e);
+    }
+
+  }
+
+  HostRoleCommandEntity constructNewPersistenceEntity() {
+    HostRoleCommandEntity hostRoleCommandEntity = new HostRoleCommandEntity();
+    hostRoleCommandEntity.setHostName(hostName);
+    hostRoleCommandEntity.setRole(role);
+    hostRoleCommandEntity.setStatus(status);
+    hostRoleCommandEntity.setStdError(stderr);
+    hostRoleCommandEntity.setExitcode(exitCode);
+    hostRoleCommandEntity.setStdOut(stdout);
+    hostRoleCommandEntity.setStartTime(startTime);
+    hostRoleCommandEntity.setLastAttemptTime(lastAttemptTime);
+    hostRoleCommandEntity.setAttemptCount(attemptCount);
+
+    try {
+      hostRoleCommandEntity.setEvent(StageUtils.jaxbToString(event));
+      ExecutionCommandEntity executionCommandEntity = new ExecutionCommandEntity();
+      executionCommandEntity.setCommand(StageUtils.jaxbToString(executionCommand));
+      executionCommandEntity.setHostRoleCommand(hostRoleCommandEntity);
+      hostRoleCommandEntity.setExecutionCommand(executionCommandEntity);
+    } catch (JAXBException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return hostRoleCommandEntity;
+  }
+
+  ExecutionCommandEntity constructExecutionCommandEntity(){
+    try {
+      ExecutionCommandEntity executionCommandEntity = new ExecutionCommandEntity();
+      executionCommandEntity.setCommand(StageUtils.jaxbToString(executionCommand));
+      return executionCommandEntity;
+    } catch (JAXBException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  public int getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(int taskId) {
+      this.taskId = taskId;
+      executionCommand.setTaskId(taskId);
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
   public Role getRole() {
     return role;
   }
@@ -107,12 +202,20 @@ class HostRoleCommand {
   public void incrementAttemptCount() {
     this.attemptCount++;
   }
-  
+
+  public ExecutionCommand getExecutionCommand() {
+    return executionCommand;
+  }
+
+  public void setExecutionCommand(ExecutionCommand executionCommand) {
+    this.executionCommand = executionCommand;
+  }
+
   @Override
   public int hashCode() {
     return role.hashCode();
   }
-  
+
   @Override
   public boolean equals(Object other) {
     if (!(other instanceof HostRoleCommand)) {
@@ -121,20 +224,20 @@ class HostRoleCommand {
     HostRoleCommand o = (HostRoleCommand) other;
     return this.role.equals(o.role);
   }
-  
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("HostRoleCommand State:\n");
-    builder.append("  Role: "+role+"\n");
-    builder.append("  Status: "+status+"\n");
-    builder.append("  Event: "+event+"\n");
-    builder.append("  stdout: "+stdout+"\n");
-    builder.append("  stderr: "+stderr+"\n");
-    builder.append("  exitcode: "+exitCode+"\n");
-    builder.append("  Start time: " + startTime+"\n");
-    builder.append("  Last attempt time: "+lastAttemptTime+"\n");
-    builder.append("  attempt count: "+ attemptCount+"\n");
+    builder.append("  Role: " + role + "\n");
+    builder.append("  Status: " + status + "\n");
+    builder.append("  Event: " + event + "\n");
+    builder.append("  stdout: " + stdout + "\n");
+    builder.append("  stderr: " + stderr + "\n");
+    builder.append("  exitcode: " + exitCode + "\n");
+    builder.append("  Start time: " + startTime + "\n");
+    builder.append("  Last attempt time: " + lastAttemptTime + "\n");
+    builder.append("  attempt count: " + attemptCount + "\n");
     return builder.toString();
   }
 }

+ 25 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommandFactory.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.actionmanager;
+
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+
+public interface HostRoleCommandFactory {
+  HostRoleCommand createExisting(HostRoleCommandEntity hostRoleCommandEntity);
+}

+ 87 - 2
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -23,9 +23,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.StageDAO;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.mortbay.log.Log;
@@ -42,7 +52,7 @@ public class Stage {
   private Map<Role, Float> successFactors = new HashMap<Role, Float>();
 
   //Map of host to host-roles
-  private Map<String, Map<String, HostRoleCommand>> hostRoleCommands = 
+  Map<String, Map<String, HostRoleCommand>> hostRoleCommands =
       new TreeMap<String, Map<String, HostRoleCommand>>();
   private Map<String, List<ExecutionCommand>> commandsToSend = 
       new TreeMap<String, List<ExecutionCommand>>();
@@ -53,6 +63,79 @@ public class Stage {
     this.clusterName = clusterName;
   }
 
+  /**
+   * Creates Stage existing in database
+   * @param actionId "requestId-stageId" string
+   */
+  @AssistedInject
+  public Stage(@Assisted String actionId, Injector injector) {
+    this(injector.getInstance(StageDAO.class).findByActionId(actionId), injector);
+  }
+
+  @AssistedInject
+  public Stage(@Assisted StageEntity stageEntity, Injector injector) {
+    HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
+    HostDAO hostDAO = injector.getInstance(HostDAO.class);
+    HostRoleCommandFactory hostRoleCommandFactory = injector.getInstance(HostRoleCommandFactory.class);
+
+    requestId = stageEntity.getRequestId();
+    stageId = stageEntity.getStageId();
+    logDir = stageEntity.getLogInfo();
+    clusterName = stageEntity.getCluster().getClusterName();
+
+    for (HostEntity hostEntity : hostDAO.findByStage(stageEntity)) {
+      List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findSortedCommandsByStageAndHost(stageEntity, hostEntity);
+      commandsToSend.put(hostEntity.getHostName(), new ArrayList<ExecutionCommand>());
+      hostRoleCommands.put(hostEntity.getHostName(), new TreeMap<String, HostRoleCommand>());
+      for (HostRoleCommandEntity command : commands) {
+        HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(command);
+        hostRoleCommands.get(hostEntity.getHostName()).put(hostRoleCommand.getRole().toString(), hostRoleCommand);
+        commandsToSend.get(hostEntity.getHostName()).add(hostRoleCommand.getExecutionCommand());
+      }
+    }
+
+    for (RoleSuccessCriteriaEntity successCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
+      successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue());
+    }
+  }
+
+  /**
+   * Creates object to be persisted in database
+   * @return StageEntity
+   */
+  public StageEntity constructNewPersistenceEntity() {
+    StageEntity stageEntity = new StageEntity();
+    stageEntity.setRequestId(requestId);
+    stageEntity.setStageId(stageId);
+    stageEntity.setLogInfo(logDir);
+    stageEntity.setHostRoleCommands(new ArrayList<HostRoleCommandEntity>());
+    stageEntity.setRoleSuccessCriterias(new ArrayList<RoleSuccessCriteriaEntity>());
+
+    for (Role role : successFactors.keySet()) {
+      RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity();
+      roleSuccessCriteriaEntity.setRole(role);
+      roleSuccessCriteriaEntity.setStage(stageEntity);
+      roleSuccessCriteriaEntity.setSuccessFactor(successFactors.get(role).doubleValue());
+      stageEntity.getRoleSuccessCriterias().add(roleSuccessCriteriaEntity);
+    }
+    return stageEntity;
+  }
+
+  public List<HostRoleCommand> getOrderedHostRoleCommands() {
+    List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
+    //TODO trick for proper storing order, check it
+    for (String hostName : hostRoleCommands.keySet()) {
+      for (ExecutionCommand executionCommand : commandsToSend.get(hostName)) {
+        for (HostRoleCommand hostRoleCommand : hostRoleCommands.get(hostName).values()) {
+          if (hostRoleCommand.getExecutionCommand() == executionCommand) {
+            commands.add(hostRoleCommand);
+          }
+        }
+      }
+    }
+    return commands;
+  }
+
   public synchronized void setStageId(long stageId) {
     if (this.stageId != -1) {
       throw new RuntimeException("Attempt to set stageId again! Not allowed.");
@@ -85,6 +168,7 @@ public class Stage {
         +", event: "+event+", clusterName: "+clusterName+", serviceName: "+serviceName);
     HostRoleCommand hrc = new HostRoleCommand(host, role, event);
     ExecutionCommand cmd = new ExecutionCommand();
+    hrc.setExecutionCommand(cmd);
     cmd.setHostname(host);
     cmd.setClusterName(clusterName);
     cmd.setServiceName(serviceName);
@@ -261,11 +345,12 @@ public class Stage {
     if (hostRoleCommands.get(hostname) == null) {
       hostRoleCommands.put(hostname, new TreeMap<String, HostRoleCommand>());
     }
+    //TODO add reference to ExecutionCommand into HostRoleCommand
     hostRoleCommands.get(hostname).put(role,
         origStage.getHostRoleCommand(hostname, role));
   }
 
-  private HostRoleCommand getHostRoleCommand(String hostname, String role) {
+  HostRoleCommand getHostRoleCommand(String hostname, String role) {
     return hostRoleCommands.get(hostname).get(role);
   }
   

+ 28 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java

@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.actionmanager;
+
+import org.apache.ambari.server.orm.entities.StageEntity;
+
+public interface StageFactory {
+
+  Stage createExisting(String actionId);
+
+  Stage createExisting(StageEntity stageEntity);
+}

+ 12 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java

@@ -41,6 +41,7 @@ public class ExecutionCommand extends AgentCommand {
     super(AgentCommandType.EXECUTION_COMMAND);
   }
   private String clusterName;
+  private int taskId;
   private String commandId;
   private String hostname;
   private Role role;
@@ -88,7 +89,17 @@ public class ExecutionCommand extends AgentCommand {
   public int hashCode() {
     return (hostname + commandId + role).hashCode();
   }
-    
+
+  @JsonProperty("taskId")
+  public int getTaskId() {
+    return taskId;
+  }
+
+  @JsonProperty("taskId")
+  public void setTaskId(int taskId) {
+    this.taskId = taskId;
+  }
+
   @JsonProperty("role")
   public Role getRole() {
     return role;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/rest/AgentResource.java

@@ -26,7 +26,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Request;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.HeartBeat;
@@ -37,6 +36,7 @@ import org.apache.ambari.server.agent.RegistrationResponse;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import com.google.inject.Inject;
 
 /**

+ 117 - 32
ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java

@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.api.services;
 
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.state.PropertyInfo;
 import org.apache.ambari.server.state.RepositoryInfo;
 import org.apache.ambari.server.state.ServiceInfo;
@@ -32,11 +33,15 @@ import java.util.Map;
 
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.ParserConfigurationException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 import org.w3c.dom.NodeList;
 import org.w3c.dom.Node;
 import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
 
 /**
  * ServiceInfo responsible getting information about cluster.
@@ -44,7 +49,36 @@ import org.w3c.dom.Element;
 @Path("/metainfo/")
 public class AmbariMetaInfo {
 
-  private String CONFIG_FILE_PATH = "C:\\workspace\\stacks";//Configuration.CONFIG_FILE
+  List<StackInfo> stacksResult = new ArrayList<StackInfo>();
+
+  private final static Logger log = LoggerFactory.getLogger(AmbariMetaInfo.class);
+
+  private static final String SERVICES_FOLDER_NAME = "services";
+  private static final String SERVICE_METAINFO_FILE_NAME = "metainfo.xml";
+  private static final String SERVICE_CONFIG_FOLDER_NAME = "configs";
+  private static final String SERVICE_CONFIG_FILE_NAME_POSTFIX = "-site.xml";
+
+  private static final String REPOSITORY_FILE_NAME = "repoinfo.xml";
+  private static final String REPOSITORY_FOLDER_NAME = "repos";
+  private static final String REPOSITORY_XML_MAIN_BLOCK_NAME = "repo";
+  private static final String REPOSITORY_XML_PROPERTY_URL = "url";
+  private static final String REPOSITORY_XML_PROPERTY_OS = "os";
+  private static final String REPOSITORY_XML_PROPERTY_DESCRIPTION = "description";
+
+  private static final String METAINFO_XML_MAIN_BLOCK_NAME = "metainfo";
+  private static final String METAINFO_XML_PROPERTY_VERSION = "version";
+  private static final String METAINFO_XML_PROPERTY_USER = "user";
+  private static final String METAINFO_XML_PROPERTY_COMMENT = "comment";
+
+  private static final String PROPERTY_XML_MAIN_BLOCK_NAME = "property";
+  private static final String PROPERTY_XML_PROPERTY_NAME = "name";
+  private static final String PROPERTY_XML_PROPERTY_VALUE = "value";
+  private static final String PROPERTY_XML_PROPERTY_DESCRIPTION = "description";
+
+
+  public AmbariMetaInfo() throws Exception {
+    getConfigurationInformation();
+  }
 
   List<ServiceInfo> getSupportedServices(String stackName, String version) {
     return null;
@@ -62,12 +96,13 @@ public class AmbariMetaInfo {
     return null;
   }
 
-  private List<ServiceInfo> getConfigurationInformation() throws Exception {
-    List<StackInfo> stacksResult = new ArrayList<StackInfo>();
+  private void getConfigurationInformation() throws Exception {
 
-    File stackRoot = new File(CONFIG_FILE_PATH);
-    if (!stackRoot.isDirectory())
-      throw new IOException("" + CONFIG_FILE_PATH + " should be a directory with stack.");
+
+    File stackRoot = new File(new Configuration().getMetadataPath());
+//    File stackRoot = new File("src/main/resources/stacks");
+    if (!stackRoot.isDirectory() && !stackRoot.exists())
+      throw new IOException("" + Configuration.METADETA_DIR_PATH + " should be a directory with stack.");
     File[] stacks = stackRoot.listFiles();
     for (File stackFolder : stacks) {
       if (stackFolder.isFile()) continue;
@@ -79,7 +114,7 @@ public class AmbariMetaInfo {
         stackInfo.setVersion(stack.getName());
         stacksResult.add(stackInfo);
         //get repository data for current stack of techs
-        File repositoryFolder = new File(stack.getAbsolutePath() + File.separator + "repos" + File.separator + "repoinfo.xml");
+        File repositoryFolder = new File(stack.getAbsolutePath() + File.separator + REPOSITORY_FOLDER_NAME + File.separator + REPOSITORY_FILE_NAME);
 
         if (repositoryFolder.exists()) {
           List<RepositoryInfo> repositoryInfoList = getRepository(repositoryFolder);
@@ -87,8 +122,8 @@ public class AmbariMetaInfo {
         }
 
 
-        //get services for this stack
-        File servicesRootFolder = new File(stack.getAbsolutePath() + File.separator + "services");
+        //Get services for this stack
+        File servicesRootFolder = new File(stack.getAbsolutePath() + File.separator + SERVICES_FOLDER_NAME);
         File[] servicesFolders = servicesRootFolder.listFiles();
 
         if (servicesFolders != null)
@@ -98,37 +133,43 @@ public class AmbariMetaInfo {
             serviceInfo.setName(serviceFolder.getName());
             stackInfo.getServices().add(serviceInfo);
 
+            //Get metainfo data from metainfo.xml
+            File metainfoFile = new File(serviceFolder.getAbsolutePath() + File.separator + SERVICE_METAINFO_FILE_NAME);
+            if (metainfoFile.exists()) {
+              setMetaInfo(metainfoFile, serviceInfo);
+
+            }
+
 
             //Get all properties from all "configs/*-site.xml" files
-            File serviceConfigFolder = new File(serviceFolder.getAbsolutePath() + File.separator + "configs");
+            File serviceConfigFolder = new File(serviceFolder.getAbsolutePath() + File.separator + SERVICE_CONFIG_FOLDER_NAME);
             File[] configFiles = serviceConfigFolder.listFiles();
             for (File config : configFiles) {
-              if (config.getName().endsWith("-site.xml")) {
+              if (config.getName().endsWith(SERVICE_CONFIG_FILE_NAME_POSTFIX)) {
                 serviceInfo.getProperties().addAll(getProperties(config));
               }
-
             }
           }
 
       }
     }//stack root
 
-    for (StackInfo elem : stacksResult) {
-      System.out.println("###elem = \n" + elem);
-      System.out.println("contain services= " + elem.getServices().size());
-    }
-    System.out.println(" \n\n\n ");
+//////TODO delete before final commit. Show all objects structure for debug
+//    for (StackInfo elem : stacksResult) {
+//      log.info("###elem = \n" + elem);
+//      log.info("contain services= " + elem.getServices().size());
+//      System.out.println("###elem = \n" + elem);
+//      System.out.println("contain services= " + elem.getServices().size());
+//    }
+//    System.out.println(" \n\n\n ");
 
 
-    return null;
   }
 
 
   public static void main(String[] args) throws Exception {
     AmbariMetaInfo metadata = new AmbariMetaInfo();
-    metadata.getConfigurationInformation();
-//    metadata.getRepository(new File("C:\\workspace\\stacks\\HDP\\0.1\\repos\\repoinfo.xml"));
-
+//    System.out.println( new Configuration().getMetadataPath() );
   }
 
 
@@ -142,7 +183,7 @@ public class AmbariMetaInfo {
       Document doc = dBuilder.parse(repositoryFile);
       doc.getDocumentElement().normalize();
 
-      NodeList propertyNodes = doc.getElementsByTagName("repo");
+      NodeList propertyNodes = doc.getElementsByTagName(REPOSITORY_XML_MAIN_BLOCK_NAME);
 
       for (int index = 0; index < propertyNodes.getLength(); index++) {
 
@@ -151,21 +192,58 @@ public class AmbariMetaInfo {
 
           Element property = (Element) node;
           RepositoryInfo repositoryInfo = new RepositoryInfo();
-          repositoryInfo.setUrl(getTagValue("url", property));
-          repositoryInfo.setOs(getTagValue("os", property));
-          repositoryInfo.setDescription(getTagValue("description", property));
+          repositoryInfo.setUrl(getTagValue(REPOSITORY_XML_PROPERTY_URL, property));
+          repositoryInfo.setOs(getTagValue(REPOSITORY_XML_PROPERTY_OS, property));
+          repositoryInfo.setDescription(getTagValue(REPOSITORY_XML_PROPERTY_DESCRIPTION, property));
           repositorysInfo.add(repositoryInfo);
         }
       }
 
     } catch (Exception e) {
       e.printStackTrace();
-
     }
+
     return repositorysInfo;
   }
 
-  public List<PropertyInfo> getProperties(File propertyFile) {
+
+  private void setMetaInfo(File metainfoFile, ServiceInfo serviceInfo) {
+
+    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+
+    Document doc = null;
+    DocumentBuilder dBuilder = null;
+    try {
+      dBuilder = dbFactory.newDocumentBuilder();
+      doc = dBuilder.parse(metainfoFile);
+    } catch (SAXException e) {
+      log.error("Error while parsing metainf.xml", e);
+    } catch (IOException e) {
+      log.error("Error while open metainf.xml", e);
+    } catch (ParserConfigurationException e) {
+      log.error("Error while parsing metainf.xml", e);
+    }
+
+    doc.getDocumentElement().normalize();
+
+    NodeList metaInfoNodes = doc.getElementsByTagName(METAINFO_XML_MAIN_BLOCK_NAME);
+
+    if (metaInfoNodes.getLength() > 0) {
+      Node metaInfoNode = metaInfoNodes.item(0);
+      if (metaInfoNode.getNodeType() == Node.ELEMENT_NODE) {
+
+        Element metaInfoElem = (Element) metaInfoNode;
+
+        serviceInfo.setVersion(getTagValue(METAINFO_XML_PROPERTY_VERSION, metaInfoElem));
+        serviceInfo.setUser(getTagValue(METAINFO_XML_PROPERTY_USER, metaInfoElem));
+        serviceInfo.setComment(getTagValue(METAINFO_XML_PROPERTY_COMMENT, metaInfoElem));
+      }
+    }
+
+  }
+
+
+  private List<PropertyInfo> getProperties(File propertyFile) {
 
     List<PropertyInfo> resultPropertyList = new ArrayList<PropertyInfo>();
     try {
@@ -175,7 +253,7 @@ public class AmbariMetaInfo {
       doc.getDocumentElement().normalize();
 
 
-      NodeList propertyNodes = doc.getElementsByTagName("property");
+      NodeList propertyNodes = doc.getElementsByTagName(PROPERTY_XML_MAIN_BLOCK_NAME);
 
       for (int index = 0; index < propertyNodes.getLength(); index++) {
 
@@ -184,9 +262,9 @@ public class AmbariMetaInfo {
 
           Element property = (Element) node;
           PropertyInfo propertyInfo = new PropertyInfo();
-          propertyInfo.setName(getTagValue("name", property));
-          propertyInfo.setValue(getTagValue("value", property));
-          propertyInfo.setDescription(getTagValue("description", property));
+          propertyInfo.setName(getTagValue(PROPERTY_XML_PROPERTY_NAME, property));
+          propertyInfo.setValue(getTagValue(PROPERTY_XML_PROPERTY_VALUE, property));
+          propertyInfo.setDescription(getTagValue(PROPERTY_XML_PROPERTY_DESCRIPTION, property));
 
           if (propertyInfo.getName() == null || propertyInfo.getValue() == null)
             continue;
@@ -201,17 +279,24 @@ public class AmbariMetaInfo {
     return resultPropertyList;
   }
 
-  private static String getTagValue(String sTag, Element rawElement) {
+
+  private String getTagValue(String sTag, Element rawElement) {
     String result = null;
     try {
       NodeList element = rawElement.getElementsByTagName(sTag).item(0).getChildNodes();
       Node value = (Node) element.item(0);
       result = value.getNodeValue();
+    } catch (NullPointerException e) {
+      log.debug("There is no field like " + sTag + "in this DOM element.", e);
+    } catch (Exception e) {
+      log.error("Error while getting value from xml DOM element", e);
+      throw e;
     } finally {
       return result;
     }
 
   }
+
 }
 
 

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java

@@ -59,6 +59,8 @@ public class Configuration {
       "security.server.passphrase_env_var";
   public static final String PASSPHRASE_KEY = "security.server.passphrase";
   public static final String RESOURCES_DIR_KEY = "resources.dir";
+  public static final String METADETA_DIR_PATH = "metadata.path";
+
 
   public static final String CLIENT_SECURITY_KEY = "client.security";
   public static final String LDAP_USE_SSL_KEY = "authorization.ldap.useSSL";
@@ -245,6 +247,15 @@ public class Configuration {
     properties.setProperty(CLIENT_SECURITY_KEY, type.toString());
   }
 
+  /**
+   * Gets ambari stack-path
+   * @return String
+   */
+  public String getMetadataPath() {
+    return properties.getProperty(METADETA_DIR_PATH);
+//    return "src/main/resources/stacks";
+  }
+
   public PersistenceType getPersistenceType() {
     String value = properties.getProperty(PERSISTENCE_IN_MEMORY_KEY, PERSISTENCE_IN_MEMORY_DEFAULT);
     if ("true".equalsIgnoreCase(value)) {

+ 4 - 3
ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java

@@ -21,8 +21,7 @@ import com.google.gson.Gson;
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.persist.jpa.JpaPersistModule;
-import org.apache.ambari.server.actionmanager.ActionDBAccessor;
-import org.apache.ambari.server.actionmanager.ActionDBInMemoryImpl;
+import org.apache.ambari.server.actionmanager.*;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.state.*;
 import org.apache.ambari.server.state.cluster.ClusterFactory;
@@ -65,10 +64,12 @@ public class ControllerModule extends AbstractModule {
     install(new FactoryModuleBuilder().implement(ServiceComponent.class, ServiceComponentImpl.class).build(ServiceComponentFactory.class));
     install(new FactoryModuleBuilder().implement(ServiceComponentHost.class, ServiceComponentHostImpl.class).build(ServiceComponentHostFactory.class));
     install(new FactoryModuleBuilder().implement(Config.class, ConfigImpl.class).build(ConfigFactory.class));
+    install(new FactoryModuleBuilder().build(StageFactory.class));
+    install(new FactoryModuleBuilder().build(HostRoleCommandFactory.class));
 
     bind(Gson.class).in(Scopes.SINGLETON);
     bind(Clusters.class).to(ClustersImpl.class);
-    bind(ActionDBAccessor.class).to(ActionDBInMemoryImpl.class);
+    bind(ActionDBAccessor.class).to(ActionDBAccessorImpl.class);
     bindConstant().annotatedWith(Names.named("schedulerSleeptime")).to(10000L);
     bindConstant().annotatedWith(Names.named("actionTimeout")).to(60000L);
     bind(AmbariManagementController.class)

+ 0 - 61
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ActionStatusDAO.java

@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.orm.dao;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.orm.entities.ActionStatusEntity;
-import org.apache.ambari.server.orm.entities.ActionStatusEntityPK;
-
-import javax.persistence.EntityManager;
-
-public class ActionStatusDAO {
-  @Inject
-  Provider<EntityManager> entityManagerProvider;
-
-  public ActionStatusEntity findByPK(ActionStatusEntityPK actionStatusEntityPK) {
-    return entityManagerProvider.get().find(ActionStatusEntity.class, actionStatusEntityPK);
-  }
-
-  public void refresh(ActionStatusEntity actionStatusEntity) {
-    entityManagerProvider.get().refresh(actionStatusEntity);
-  }
-
-  @Transactional
-  public void create(ActionStatusEntity actionStatusEntity) {
-    entityManagerProvider.get().persist(actionStatusEntity);
-  }
-
-  @Transactional
-  public ActionStatusEntity merge(ActionStatusEntity actionStatusEntity) {
-    return entityManagerProvider.get().merge(actionStatusEntity);
-  }
-
-  @Transactional
-  public void remove(ActionStatusEntity actionStatusEntity) {
-    entityManagerProvider.get().remove(actionStatusEntity);
-  }
-
-  @Transactional
-  public void removeByPK(ActionStatusEntityPK actionStatusEntityPK) {
-    remove(findByPK(actionStatusEntityPK));
-  }
-
-}

+ 60 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/DaoUtils.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.orm.dao;
+
+import com.google.inject.Singleton;
+
+import javax.persistence.NoResultException;
+import javax.persistence.Query;
+import javax.persistence.TypedQuery;
+import java.util.Collections;
+import java.util.List;
+
+@Singleton
+public class DaoUtils {
+
+  public <T> List<T> selectList(TypedQuery<T> query, Object... parameters) {
+    setParameters(query, parameters);
+    try {
+      return query.getResultList();
+    } catch (NoResultException ignored) {
+      return Collections.emptyList();
+    }
+  }
+
+  public <T> T selectSingle(TypedQuery<T> query, Object... parameters) {
+    setParameters(query, parameters);
+    try {
+      return query.getSingleResult();
+    } catch (NoResultException ignored) {
+      return null;
+    }
+  }
+
+  public int executeUpdate(Query query, Object... parameters) {
+    setParameters(query, parameters);
+    return query.executeUpdate();
+  }
+
+  public void setParameters(Query query, Object... parameters) {
+    for (int i = 0; i < parameters.length; i++) {
+      query.setParameter(i+1, parameters[i]);
+    }
+  }
+}

+ 19 - 2
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostDAO.java

@@ -21,12 +21,13 @@ package org.apache.ambari.server.orm.dao;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
 import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
 
 import javax.persistence.EntityManager;
 import javax.persistence.NoResultException;
 import javax.persistence.TypedQuery;
+import java.util.Collections;
 import java.util.List;
 
 public class HostDAO {
@@ -40,7 +41,23 @@ public class HostDAO {
 
   public List<HostEntity> findAll() {
     TypedQuery<HostEntity> query = entityManagerProvider.get().createQuery("SELECT host FROM HostEntity host", HostEntity.class);
-    return query.getResultList();
+    try {
+      return query.getResultList();
+    } catch (NoResultException e) {
+      return Collections.emptyList();
+    }
+  }
+
+  public List<HostEntity> findByStage(StageEntity stageEntity) {
+    TypedQuery<HostEntity> query = entityManagerProvider.get().createQuery(
+        "SELECT DISTINCT host FROM HostEntity host JOIN host.hostRoleCommandEntities command JOIN command.stage stage " +
+            "WHERE stage=:stageEntity", HostEntity.class);
+    query.setParameter("stageEntity", stageEntity);
+    try {
+      return query.getResultList();
+    } catch (NoResultException e) {
+      return Collections.emptyList();
+    }
   }
 
   /**

+ 36 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java

@@ -21,19 +21,55 @@ package org.apache.ambari.server.orm.dao;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.persist.Transactional;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.StageEntity;
 
 import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import javax.persistence.TypedQuery;
+import java.util.Collection;
+import java.util.List;
 
 public class HostRoleCommandDAO {
 
   @Inject
   Provider<EntityManager> entityManagerProvider;
+  @Inject
+  DaoUtils daoUtils;
 
   public HostRoleCommandEntity findByPK(int taskId) {
     return entityManagerProvider.get().find(HostRoleCommandEntity.class, taskId);
   }
 
+  public List<HostRoleCommandEntity> findSortedCommandsByStageAndHost(StageEntity stageEntity, HostEntity hostEntity) {
+    TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +
+        "FROM HostRoleCommandEntity hostRoleCommand " +
+        "WHERE hostRoleCommand.stage=?1 AND hostRoleCommand.host=?2 " +
+        "ORDER BY hostRoleCommand.taskId", HostRoleCommandEntity.class);
+    return daoUtils.selectList(query, stageEntity, hostEntity);
+  }
+
+  public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, Role role) {
+    TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
+        "FROM HostRoleCommandEntity command " +
+        "WHERE command.hostName=?1 AND command.requestId=?2 " +
+        "AND command.stageId=?3 AND command.role=?4", HostRoleCommandEntity.class);
+
+    return daoUtils.selectList(query, hostName, requestId, stageId, role);
+  }
+
+  @Transactional
+  public int updateStatusByRequestId(long requestId, HostRoleStatus target, Collection<HostRoleStatus> sources) {
+    Query query = entityManagerProvider.get().createQuery("UPDATE HostRoleCommandEntity command " +
+        "SET command.status=?1 " +
+        "WHERE command.requestId=?2 AND command.status IN ?3");
+
+    return daoUtils.executeUpdate(query, target, requestId, sources);
+  }
+
   @Transactional
   public void create(HostRoleCommandEntity stageEntity) {
     entityManagerProvider.get().persist(stageEntity);

+ 39 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java

@@ -21,20 +21,59 @@ package org.apache.ambari.server.orm.dao;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.persist.Transactional;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.apache.ambari.server.utils.StageUtils;
 
 import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
+import java.util.Collection;
+import java.util.List;
 
 public class StageDAO {
 
   @Inject
   Provider<EntityManager> entityManagerProvider;
+  @Inject
+  DaoUtils daoUtils;
 
   public StageEntity findByPK(StageEntityPK stageEntityPK) {
     return entityManagerProvider.get().find(StageEntity.class, stageEntityPK);
   }
 
+  public long getLastRequestId() {
+    TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT stage.requestId " +
+        "FROM StageEntity stage" +
+        " WHERE stage.requestId = (SELECT max(stage.requestId) FROM StageEntity stage)", Long.class);
+    Long result = daoUtils.selectSingle(query);
+    if (result != null) {
+      return result;
+    } else {
+      return -1;
+    }
+  }
+
+  public StageEntity findByActionId(String actionId) {
+    long[] ids = StageUtils.getRequestStage(actionId);
+    StageEntityPK pk = new StageEntityPK();
+    pk.setRequestId(ids[0]);
+    pk.setStageId(ids[1]);
+    return findByPK(pk);
+  }
+
+  public List<StageEntity> findByRequestId(long requestId) {
+    TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage FROM StageEntity stage WHERE stage.requestId=?1", StageEntity.class);
+    return daoUtils.selectList(query, requestId);
+  }
+
+  public List<StageEntity> findByCommandStatuses(Collection<HostRoleStatus> statuses) {
+    TypedQuery<StageEntity> query = entityManagerProvider.get().createQuery("SELECT stage " +
+        "FROM StageEntity stage JOIN stage.hostRoleCommands command " +
+        "WHERE command.status IN ?1", StageEntity.class);
+    return daoUtils.selectList(query, statuses);
+  }
+
   @Transactional
   public void create(StageEntity stageEntity) {
     entityManagerProvider.get().persist(stageEntity);

+ 0 - 209
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionStatusEntity.java

@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.orm.entities;
-
-import javax.persistence.Basic;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.ManyToOne;
-import java.util.Arrays;
-
-@javax.persistence.IdClass(ActionStatusEntityPK.class)
-@javax.persistence.Table(name = "actionstatus", schema = "ambari", catalog = "")
-@Entity
-public class ActionStatusEntity {
-  private Long clusterId;
-
-  @javax.persistence.Column(name = "cluster_id", nullable = false, insertable = false, updatable = false, length = 10)
-  @Id
-  public Long getClusterId() {
-    return clusterId;
-  }
-
-  public void setClusterId(Long clusterId) {
-    this.clusterId = clusterId;
-  }
-
-  private String hostName = "";
-
-  @javax.persistence.Column(name = "host_name", nullable = false, insertable = false, updatable = false)
-  @Id
-  public String getHostName() {
-    return hostName;
-  }
-
-  public void setHostName(String hostName) {
-    this.hostName = hostName;
-  }
-
-  private String role = "";
-
-  @javax.persistence.Column(name = "role", nullable = false, insertable = true, updatable = true)
-  @Id
-  public String getRole() {
-    return role;
-  }
-
-  public void setRole(String role) {
-    this.role = role;
-  }
-
-  private Integer requestId = 0;
-
-  @javax.persistence.Column(name = "request_id", nullable = false, insertable = true, updatable = true, length = 10)
-  @Id
-  public int getRequestId() {
-    return requestId;
-  }
-
-  public void setRequestId(int requestId) {
-    this.requestId = requestId;
-  }
-
-  private Integer stageId = 0;
-
-  @javax.persistence.Column(name = "stage_id", nullable = false, insertable = true, updatable = true, length = 10)
-  @Id
-  public int getStageId() {
-    return stageId;
-  }
-
-  public void setStageId(int stageId) {
-    this.stageId = stageId;
-  }
-
-  private String event = "";
-
-  @javax.persistence.Column(name = "event", nullable = false, insertable = true, updatable = true)
-  @Basic
-  public String getEvent() {
-    return event;
-  }
-
-  public void setEvent(String event) {
-    this.event = event;
-  }
-
-  private Integer taskId = 0;
-
-  @javax.persistence.Column(name = "task_id", nullable = false, insertable = true, updatable = true, length = 10)
-  @Basic
-  public int getTaskId() {
-    return taskId;
-  }
-
-  public void setTaskId(int taskId) {
-    this.taskId = taskId;
-  }
-
-  private String status = "";
-
-  @javax.persistence.Column(name = "status", nullable = false, insertable = true, updatable = true)
-  @Basic
-  public String getStatus() {
-    return status;
-  }
-
-  public void setStatus(String status) {
-    this.status = status;
-  }
-
-  private String logInfo;
-
-  @javax.persistence.Column(name = "log_info", nullable = false, insertable = true, updatable = true)
-  @Basic
-  public String getLogInfo() {
-    return logInfo;
-  }
-
-  public void setLogInfo(String logInfo) {
-    this.logInfo = logInfo;
-  }
-
-  private byte[] continueCriteria;
-
-  @javax.persistence.Column(name = "continue_criteria", nullable = false, insertable = true, updatable = true)
-  @Basic
-  public byte[] getContinueCriteria() {
-    return continueCriteria;
-  }
-
-  public void setContinueCriteria(byte[] continueCriteria) {
-    this.continueCriteria = continueCriteria;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    ActionStatusEntity that = (ActionStatusEntity) o;
-
-    if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false;
-    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
-    if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false;
-    if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) return false;
-    if (!Arrays.equals(continueCriteria, that.continueCriteria)) return false;
-    if (event != null ? !event.equals(that.event) : that.event != null) return false;
-    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false;
-    if (logInfo != null ? !logInfo.equals(that.logInfo) : that.logInfo != null) return false;
-    if (role != null ? !role.equals(that.role) : that.role != null) return false;
-    if (status != null ? !status.equals(that.status) : that.status != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = clusterId != null ? clusterId.intValue() : 0;
-    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
-    result = 31 * result + (role != null ? role.hashCode() : 0);
-    result = 31 * result + (requestId !=null ? requestId : 0);
-    result = 31 * result + (stageId !=null ? stageId : 0);
-    result = 31 * result + (event != null ? event.hashCode() : 0);
-    result = 31 * result + (taskId !=null ? taskId : 0);
-    result = 31 * result + (status != null ? status.hashCode() : 0);
-    result = 31 * result + (logInfo != null ? logInfo.hashCode() : 0);
-    result = 31 * result + (continueCriteria != null ? Arrays.hashCode(continueCriteria) : 0);
-    return result;
-  }
-
-  private ClusterEntity clusterEntity;
-
-  @ManyToOne
-  @javax.persistence.JoinColumn(name = "cluster_id", referencedColumnName = "cluster_id", nullable = false)
-  public ClusterEntity getClusterEntity() {
-    return clusterEntity;
-  }
-
-  public void setClusterEntity(ClusterEntity clusterEntity) {
-    this.clusterEntity = clusterEntity;
-  }
-
-  private HostEntity hostEntity;
-
-  @ManyToOne
-  @javax.persistence.JoinColumn(name = "host_name", referencedColumnName = "host_name", nullable = false)
-  public HostEntity getHostEntity() {
-    return hostEntity;
-  }
-
-  public void setHostEntity(HostEntity hostEntity) {
-    this.hostEntity = hostEntity;
-  }
-}

+ 0 - 110
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ActionStatusEntityPK.java

@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.orm.entities;
-
-import javax.persistence.Column;
-import javax.persistence.Id;
-import java.io.Serializable;
-
-public class ActionStatusEntityPK implements Serializable {
-  private Long clusterId;
-
-  @Id
-  @Column(name = "cluster_id", nullable = false, insertable = true, updatable = true, length = 10)
-  public Long getClusterId() {
-    return clusterId;
-  }
-
-  public void setClusterId(Long clusterId) {
-    this.clusterId = clusterId;
-  }
-
-  private String hostName;
-
-  @Id
-  @Column(name = "host_name", nullable = false, insertable = true, updatable = true)
-  public String getHostName() {
-    return hostName;
-  }
-
-  public void setHostName(String hostName) {
-    this.hostName = hostName;
-  }
-
-  private String role;
-
-  @Id
-  @Column(name = "role", nullable = false, insertable = true, updatable = true)
-  public String getRole() {
-    return role;
-  }
-
-  public void setRole(String role) {
-    this.role = role;
-  }
-
-  private Integer requestId;
-
-  @Id
-  @Column(name = "request_id", nullable = false, insertable = true, updatable = true, length = 10)
-  public Integer getRequestId() {
-    return requestId;
-  }
-
-  public void setRequestId(Integer requestId) {
-    this.requestId = requestId;
-  }
-
-  private Integer stageId;
-
-  @Id
-  @Column(name = "stage_id", nullable = false, insertable = true, updatable = true, length = 10)
-  public Integer getStageId() {
-    return stageId;
-  }
-
-  public void setStageId(Integer stageId) {
-    this.stageId = stageId;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    ActionStatusEntityPK that = (ActionStatusEntityPK) o;
-
-    if (clusterId != null ? !clusterId.equals(that.clusterId) : that.clusterId != null) return false;
-    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false;
-    if (stageId != null ? !stageId.equals(that.stageId) : that.stageId != null) return false;
-    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false;
-    if (role != null ? !role.equals(that.role) : that.role != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = clusterId != null ? clusterId.intValue() : 0;
-    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
-    result = 31 * result + (role != null ? role.hashCode() : 0);
-    result = 31 * result + (requestId !=null ? requestId : 0);
-    result = 31 * result + (stageId !=null ? stageId : 0);
-    return result;
-  }
-}

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ClusterEntity.java

@@ -32,12 +32,13 @@ import java.util.Collection;
             "FROM ClusterEntity clusters")
 })
 @Entity
+@SequenceGenerator(name = "ambari.clusters_cluster_id_seq", allocationSize = 1)
 public class ClusterEntity {
   private Long clusterId;
 
   @javax.persistence.Column(name = "cluster_id", nullable = false, insertable = true, updatable = true)
   @Id
-  @GeneratedValue(strategy = GenerationType.AUTO)
+  @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ambari.clusters_cluster_id_seq")
   public Long getClusterId() {
     return clusterId;
   }

+ 6 - 7
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java

@@ -19,7 +19,6 @@
 package org.apache.ambari.server.orm.entities;
 
 import javax.persistence.*;
-import java.util.Arrays;
 
 @Table(name = "execution_command", schema = "ambari", catalog = "")
 @Entity
@@ -36,15 +35,15 @@ public class ExecutionCommandEntity {
     this.taskId = taskId;
   }
 
-  private byte[] command;
+  private String command;
 
-  @Column(name = "command")
+  @Column(name = "command", length = 32000)
   @Basic
-  public byte[] getCommand() {
+  public String getCommand() {
     return command;
   }
 
-  public void setCommand(byte[] command) {
+  public void setCommand(String command) {
     this.command = command;
   }
 
@@ -55,7 +54,7 @@ public class ExecutionCommandEntity {
 
     ExecutionCommandEntity that = (ExecutionCommandEntity) o;
 
-    if (!Arrays.equals(command, that.command)) return false;
+    if (command != null ? !command.equals(that.command) : that.command != null) return false;
     if (taskId != null ? !taskId.equals(that.taskId) : that.taskId != null) return false;
 
     return true;
@@ -64,7 +63,7 @@ public class ExecutionCommandEntity {
   @Override
   public int hashCode() {
     int result = taskId != null ? taskId.hashCode() : 0;
-    result = 31 * result + (command != null ? Arrays.hashCode(command) : 0);
+    result = 31 * result + (command != null ? command.hashCode() : 0);
     return result;
   }
 

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostComponentMappingEntity.java

@@ -23,6 +23,7 @@ import javax.persistence.*;
 @javax.persistence.IdClass(HostComponentMappingEntityPK.class)
 @javax.persistence.Table(name = "hostcomponentmapping", schema = "ambari", catalog = "")
 @Entity
+@SequenceGenerator(name = "ambari.hostcomponentmapping_host_component_mapping_id_seq", allocationSize = 1)
 public class HostComponentMappingEntity {
   private Long clusterId;
 
@@ -52,7 +53,7 @@ public class HostComponentMappingEntity {
 
   @javax.persistence.Column(name = "host_component_mapping_id", nullable = false, insertable = true, updatable = true, length = 10)
   @Id
-  @GeneratedValue(strategy = GenerationType.AUTO)
+  @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ambari.hostcomponentmapping_host_component_mapping_id_seq")
   public Integer getHostComponentMappingId() {
     return hostComponentMappingId;
   }

+ 17 - 5
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostEntity.java

@@ -110,7 +110,8 @@ public class HostEntity {
 
   private String disksInfo = "";
 
-  @javax.persistence.Column(name = "disks_info", nullable = false, insertable = true, updatable = true)
+  @javax.persistence.Column(name = "disks_info", nullable = false, insertable = true, 
+		  updatable = true, length = 2000)
   @Basic
   public String getDisksInfo() {
     return disksInfo;
@@ -199,9 +200,9 @@ public class HostEntity {
 
     HostEntity that = (HostEntity) o;
 
-    if (cpuCount != that.cpuCount) return false;
-    if (lastRegistrationTime != that.lastRegistrationTime) return false;
-    if (totalMem != that.totalMem) return false;
+    if (cpuCount != null ? !cpuCount.equals(that.cpuCount) : that.cpuCount != null) return false;
+    if (lastRegistrationTime != null ? !lastRegistrationTime.equals(that.lastRegistrationTime) : that.lastRegistrationTime != null) return false;
+    if (totalMem != null ? !totalMem.equals(that.totalMem) : that.totalMem != null) return false;
     if (cpuInfo != null ? !cpuInfo.equals(that.cpuInfo) : that.cpuInfo != null) return false;
     if (discoveryStatus != null ? !discoveryStatus.equals(that.discoveryStatus) : that.discoveryStatus != null)
       return false;
@@ -297,7 +298,18 @@ public class HostEntity {
     this.serviceComponentHostConfigEntities = serviceComponentHostConfigEntities;
   }
 
-//  private Collection<ServiceComponentStateEntity> serviceComponentStateEntities;
+  private Collection<HostRoleCommandEntity> hostRoleCommandEntities;
+
+  @OneToMany(mappedBy = "host")
+  public Collection<HostRoleCommandEntity> getHostRoleCommandEntities() {
+    return hostRoleCommandEntities;
+  }
+
+  public void setHostRoleCommandEntities(Collection<HostRoleCommandEntity> hostRoleCommandEntities) {
+    this.hostRoleCommandEntities = hostRoleCommandEntities;
+  }
+
+  //  private Collection<ServiceComponentStateEntity> serviceComponentStateEntities;
 //
 //  @OneToMany(mappedBy = "hostEntity")
 //  public Collection<ServiceComponentStateEntity> getServiceComponentStateEntities() {

+ 25 - 20
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java

@@ -18,15 +18,20 @@
 
 package org.apache.ambari.server.orm.entities;
 
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+
 import javax.persistence.*;
 
 @Table(name = "host_role_command", schema = "ambari", catalog = "")
 @Entity
+@SequenceGenerator(name = "ambari.host_role_command_task_id_seq", allocationSize = 1)
 public class HostRoleCommandEntity {
   private Integer taskId;
 
   @Column(name = "task_id")
   @Id
+  @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ambari.host_role_command_task_id_seq")
   public Integer getTaskId() {
     return taskId;
   }
@@ -35,27 +40,27 @@ public class HostRoleCommandEntity {
     this.taskId = taskId;
   }
 
-  private Integer requestId;
+  private Long requestId;
 
   @Column(name = "request_id", insertable = false, updatable = false, nullable = false)
   @Basic
-  public Integer getRequestId() {
+  public Long getRequestId() {
     return requestId;
   }
 
-  public void setRequestId(Integer requestId) {
+  public void setRequestId(Long requestId) {
     this.requestId = requestId;
   }
 
-  private Integer stageId;
+  private Long stageId;
 
   @Column(name = "stage_id", insertable = false, updatable = false, nullable = false)
   @Basic
-  public Integer getStageId() {
+  public Long getStageId() {
     return stageId;
   }
 
-  public void setStageId(Integer stageId) {
+  public void setStageId(Long stageId) {
     this.stageId = stageId;
   }
 
@@ -71,15 +76,15 @@ public class HostRoleCommandEntity {
     this.hostName = hostName;
   }
 
-  private String role = "";
+  private Role role;
 
-  @Column(name = "role", nullable = false)
-  @Basic
-  public String getRole() {
+  @Column(name = "role")
+  @Enumerated(EnumType.STRING)
+  public Role getRole() {
     return role;
   }
 
-  public void setRole(String role) {
+  public void setRole(Role role) {
     this.role = role;
   }
 
@@ -119,15 +124,15 @@ public class HostRoleCommandEntity {
     this.exitcode = exitcode;
   }
 
-  private String status = "";
+  private HostRoleStatus status;
 
-  @Column(name = "status", nullable = false)
-  @Basic
-  public String getStatus() {
+  @Column(name = "status")
+  @Enumerated(EnumType.STRING)
+  public HostRoleStatus getStatus() {
     return status;
   }
 
-  public void setStatus(String status) {
+  public void setStatus(HostRoleStatus status) {
     this.status = status;
   }
 
@@ -179,15 +184,15 @@ public class HostRoleCommandEntity {
     this.lastAttemptTime = lastAttemptTime;
   }
 
-  private Integer attemptCount = 0;
+  private Short attemptCount = 0;
 
   @Column(name = "attempt_count", nullable = false)
   @Basic
-  public Integer getAttemptCount() {
+  public Short getAttemptCount() {
     return attemptCount;
   }
 
-  public void setAttemptCount(Integer attemptCount) {
+  public void setAttemptCount(Short attemptCount) {
     this.attemptCount = attemptCount;
   }
 
@@ -270,4 +275,4 @@ public class HostRoleCommandEntity {
   public void setHost(HostEntity host) {
     this.host = host;
   }
-}
+}

+ 12 - 9
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java

@@ -18,45 +18,48 @@
 
 package org.apache.ambari.server.orm.entities;
 
+import org.apache.ambari.server.Role;
+
 import javax.persistence.*;
 
 @IdClass(org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntityPK.class)
 @Table(name = "role_success_criteria", schema = "ambari", catalog = "")
 @Entity
 public class RoleSuccessCriteriaEntity {
-  private Integer requestId;
+  private Long requestId;
 
   @Column(name = "request_id", insertable = false, updatable = false, nullable = false)
   @Id
-  public Integer getRequestId() {
+  public Long getRequestId() {
     return requestId;
   }
 
-  public void setRequestId(Integer requestId) {
+  public void setRequestId(Long requestId) {
     this.requestId = requestId;
   }
 
-  private Integer stageId;
+  private Long stageId;
 
   @Column(name = "stage_id", insertable = false, updatable = false, nullable = false)
   @Id
-  public Integer getStageId() {
+  public Long getStageId() {
     return stageId;
   }
 
-  public void setStageId(Integer stageId) {
+  public void setStageId(Long stageId) {
     this.stageId = stageId;
   }
 
-  private String role;
+  private Role role;
 
   @Column(name = "role")
+  @Enumerated(EnumType.STRING)
   @Id
-  public String getRole() {
+  public Role getRole() {
     return role;
   }
 
-  public void setRole(String role) {
+  public void setRole(Role role) {
     this.role = role;
   }
 

+ 15 - 10
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntityPK.java

@@ -18,44 +18,49 @@
 
 package org.apache.ambari.server.orm.entities;
 
+import org.apache.ambari.server.Role;
+
 import javax.persistence.Column;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
 import javax.persistence.Id;
 import java.io.Serializable;
 
 public class RoleSuccessCriteriaEntityPK implements Serializable {
-  private Integer requestId;
+  private Long requestId;
 
   @Id
   @Column(name = "request_id")
-  public Integer getRequestId() {
+  public Long getRequestId() {
     return requestId;
   }
 
-  public void setRequestId(Integer requestId) {
+  public void setRequestId(Long requestId) {
     this.requestId = requestId;
   }
 
-  private Integer stageId;
+  private Long stageId;
 
   @Id
   @Column(name = "stage_id")
-  public Integer getStageId() {
+  public Long getStageId() {
     return stageId;
   }
 
-  public void setStageId(Integer stageId) {
+  public void setStageId(Long stageId) {
     this.stageId = stageId;
   }
 
-  private String role;
+  private Role role;
 
-  @Id
   @Column(name = "role")
-  public String getRole() {
+  @Enumerated(EnumType.STRING)
+  @Id
+  public Role getRole() {
     return role;
   }
 
-  public void setRole(String role) {
+  public void setRole(Role role) {
     this.role = role;
   }
 

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceComponentConfigEntity.java

@@ -23,12 +23,13 @@ import java.util.Date;
 
 @javax.persistence.Table(name = "servicecomponentconfig", schema = "ambari", catalog = "")
 @Entity
+@SequenceGenerator(name = "ambari.servicecomponentconfig_config_version_seq", allocationSize = 1)
 public class ServiceComponentConfigEntity {
   private Integer configVersion;
 
   @javax.persistence.Column(name = "config_version", nullable = false, insertable = true, updatable = true, length = 10)
   @Id
-  @GeneratedValue(strategy = GenerationType.AUTO)
+  @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ambari.servicecomponentconfig_config_version_seq")
   public Integer getConfigVersion() {
     return configVersion;
   }

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceComponentHostConfigEntity.java

@@ -23,12 +23,13 @@ import java.util.Date;
 
 @javax.persistence.Table(name = "servicecomponenthostconfig", schema = "ambari", catalog = "")
 @Entity
+@SequenceGenerator(name = "ambari.servicecomponenthostconfig_config_version_seq", allocationSize = 1)
 public class ServiceComponentHostConfigEntity {
   private Integer configVersion;
 
   @javax.persistence.Column(name = "config_version", nullable = false, insertable = true, updatable = true, length = 10)
   @Id
-  @GeneratedValue(strategy = GenerationType.AUTO)
+  @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ambari.servicecomponenthostconfig_config_version_seq")
   public Integer getConfigVersion() {
     return configVersion;
   }

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ServiceConfigEntity.java

@@ -23,12 +23,13 @@ import java.util.Date;
 
 @javax.persistence.Table(name = "serviceconfig", schema = "ambari", catalog = "")
 @Entity
+@SequenceGenerator(name = "ambari.serviceconfig_config_version_seq", allocationSize = 1)
 public class ServiceConfigEntity {
   private Integer configVersion;
 
   @javax.persistence.Column(name = "config_version", nullable = false, insertable = true, updatable = true, length = 10)
   @Id
-  @GeneratedValue(strategy = GenerationType.AUTO)
+  @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ambari.serviceconfig_config_version_seq")
   public Integer getConfigVersion() {
     return configVersion;
   }

+ 9 - 10
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java

@@ -25,40 +25,39 @@ import java.util.Collection;
 @Table(name = "stage", schema = "ambari", catalog = "")
 @Entity
 public class StageEntity {
-  private Integer clusterId;
+  private Long clusterId;
 
   @Column(name = "cluster_id", insertable = false, updatable = false, nullable = false)
   @Basic
-  public Integer getClusterId() {
+  public Long getClusterId() {
     return clusterId;
   }
 
-  public void setClusterId(Integer clusterId) {
+  public void setClusterId(Long clusterId) {
     this.clusterId = clusterId;
   }
 
-  private Integer requestId;
+  private Long requestId;
 
   @Column(name = "request_id")
   @Id
-  //TODO auto generated? @GeneratedValue(strategy = GenerationType.AUTO)
-  public Integer getRequestId() {
+  public Long getRequestId() {
     return requestId;
   }
 
-  public void setRequestId(Integer requestId) {
+  public void setRequestId(Long requestId) {
     this.requestId = requestId;
   }
 
-  private Integer stageId = 0;
+  private Long stageId = 0L;
 
   @Column(name = "stage_id", nullable = false)
   @Id
-  public Integer getStageId() {
+  public Long getStageId() {
     return stageId;
   }
 
-  public void setStageId(Integer stageId) {
+  public void setStageId(Long stageId) {
     this.stageId = stageId;
   }
 

+ 6 - 6
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntityPK.java

@@ -23,27 +23,27 @@ import javax.persistence.Id;
 import java.io.Serializable;
 
 public class StageEntityPK implements Serializable {
-  private Integer requestId;
+  private Long requestId;
 
   @Id
   @Column(name = "request_id")
-  public Integer getRequestId() {
+  public Long getRequestId() {
     return requestId;
   }
 
-  public void setRequestId(Integer requestId) {
+  public void setRequestId(Long requestId) {
     this.requestId = requestId;
   }
 
-  private Integer stageId;
+  private Long stageId;
 
   @Id
   @Column(name = "stage_id")
-  public Integer getStageId() {
+  public Long getStageId() {
     return stageId;
   }
 
-  public void setStageId(Integer stageId) {
+  public void setStageId(Long stageId) {
     this.stageId = stageId;
   }
 

+ 2 - 2
ambari-server/src/main/java/org/apache/ambari/server/security/CertificateManager.java

@@ -181,8 +181,8 @@ public class CertificateManager {
     String passphraseSrvr = configs.getConfigsMap().get(Configuration.
         PASSPHRASE_KEY);
 
-    System.out.println(passphraseSrvr);
-    System.out.println(passphraseAgent);
+    LOG.info("Pass phrase Server " + passphraseSrvr);
+    LOG.info("Pass Phrase Agent" + passphraseAgent);
 
     if (!passphraseSrvr.equals(passphraseAgent)) {
       LOG.warn("Incorrect passphrase from agent");

+ 30 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java

@@ -19,6 +19,9 @@
 package org.apache.ambari.server.state;
 
 import org.apache.ambari.server.state.fsm.event.AbstractEvent;
+import org.apache.ambari.server.state.svccomphost.*;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
 
 /**
  * Base class for all events that affect the ServiceComponentHost FSM
@@ -70,4 +73,31 @@ public abstract class ServiceComponentHostEvent
     return opTimestamp;
   }
 
+  @JsonCreator
+  public static ServiceComponentHostEvent create(@JsonProperty("type") ServiceComponentHostEventType type,
+                                                 @JsonProperty("serviceComponentName") String serviceComponentName,
+                                                 @JsonProperty("hostName") String hostName, @JsonProperty("opTimestamp") long opTimestamp) {
+    switch (type) {
+      case HOST_SVCCOMP_INSTALL:
+        return new ServiceComponentHostInstallEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_OP_FAILED:
+        return new ServiceComponentHostOpFailedEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_OP_IN_PROGRESS:
+        return new ServiceComponentHostOpInProgressEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_OP_RESTART:
+        return new ServiceComponentHostOpRestartedEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_OP_SUCCEEDED:
+        return new ServiceComponentHostOpSucceededEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_START:
+        return new ServiceComponentHostStartEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_STOP:
+        return new ServiceComponentHostStopEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_UNINSTALL:
+        return new ServiceComponentHostUninstallEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_WIPEOUT:
+        return new ServiceComponentHostWipeoutEvent(serviceComponentName, hostName, opTimestamp);
+    }
+    return null;
+  }
+
 }

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceInfo.java

@@ -23,9 +23,9 @@ import java.util.List;
 
 public class ServiceInfo {
   private String name;
-  private String version;
-  private String user;
-  private String comment;
+    private String version;
+    private String user;
+    private String comment;
   private List<PropertyInfo> properties;
 
   public String getName() {

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java

@@ -116,4 +116,12 @@ public class StageUtils {
     InputStream is = new ByteArrayInputStream(json.getBytes());
     return mapper.readValue(is, ExecutionCommand.class);
   }
+
+  public static <T> T fromJson(String json, Class<T> clazz) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
+    mapper.configure(SerializationConfig.Feature.USE_ANNOTATIONS, true);
+    InputStream is = new ByteArrayInputStream(json.getBytes());
+    return mapper.readValue(is, clazz);
+  }
 }

+ 3 - 3
ambari-server/src/main/python/ambari-server-state/core-site.xml

@@ -60,7 +60,7 @@
     <property>
         <name>fs.default.name</name>
         <!-- cluster variant -->
-        <value>hdfs://hdp1.cybervisiontech.com.ua:8020</value>
+        <value>hdfs://hdp1:8020</value>
         <description>The name of the default file system. Either the
             literal string "local" or a host:port for NDFS.
         </description>
@@ -240,7 +240,7 @@
 
     <property>
         <name>hadoop.proxyuser.oozie.hosts</name>
-        <value>hdp1.cybervisiontech.com.ua</value>
+        <value>hdp1</value>
         <description>
             Proxy host for Hadoop.
         </description>
@@ -256,7 +256,7 @@
 
     <property>
         <name>hadoop.proxyuser.templeton.hosts</name>
-        <value>hdp1.cybervisiontech.com.ua</value>
+        <value>hdp1</value>
         <description>
             Proxy host for templeton.
         </description>

+ 23 - 22
ambari-server/src/main/resources/Ambari-DDL.sql

@@ -60,7 +60,7 @@ FOREIGN KEY(user_name, ldap_user) REFERENCES Users(user_name, ldap_user)
 /* Overall clusters table - all created/managed clusters */
 CREATE TABLE Clusters
 (
-cluster_id SERIAL,
+cluster_id BIGSERIAL,
 cluster_name VARCHAR UNIQUE NOT NULL,
 desired_cluster_state VARCHAR DEFAULT '' NOT NULL,
 cluster_info VARCHAR DEFAULT '' NOT NULL,
@@ -90,14 +90,14 @@ PRIMARY KEY (host_name)
 /* Cluster Hosts mapping table */
 CREATE TABLE ClusterHostMapping
 (
-  cluster_id INTEGER references Clusters(cluster_id),
+  cluster_id BIGINT references Clusters(cluster_id),
   host_name VARCHAR references Hosts(host_name),
   PRIMARY KEY(cluster_id, host_name)
 );
 
 CREATE TABLE ClusterServices
 (
-cluster_id INTEGER NOT NULL references Clusters(cluster_id),
+cluster_id BIGINT NOT NULL references Clusters(cluster_id),
 service_name VARCHAR,
 service_enabled INTEGER DEFAULT '0' NOT NULL,
 PRIMARY KEY (cluster_id,service_name)
@@ -110,7 +110,7 @@ PRIMARY KEY (cluster_id,service_name)
 CREATE TABLE ServiceConfig
 (
 config_version SERIAL /*INTEGER NOT NULL AUTO_INCREMENT*/,
-cluster_id INTEGER NOT NULL,
+cluster_id BIGINT NOT NULL,
 service_name VARCHAR NOT NULL,
 config_snapshot VARCHAR DEFAULT '' NOT NULL,
 config_snapshot_time timestamp NOT NULL,
@@ -125,7 +125,7 @@ FOREIGN KEY (cluster_id, service_name) REFERENCES ClusterServices(cluster_id, se
 CREATE TABLE ServiceComponentConfig
 (
 config_version SERIAL /*INTEGER NOT NULL AUTO_INCREMENT*/,
-cluster_id INTEGER NOT NULL,
+cluster_id BIGINT NOT NULL,
 service_name VARCHAR NOT NULL,
 component_name VARCHAR NOT NULL,
 config_snapshot VARCHAR DEFAULT '' NOT NULL,
@@ -138,7 +138,7 @@ FOREIGN KEY (cluster_id, service_name) REFERENCES ClusterServices(cluster_id, se
 CREATE TABLE ServiceComponentHostConfig
 (
 config_version SERIAL /*INTEGER NOT NULL AUTO_INCREMENT*/,
-cluster_id INTEGER NOT NULL,
+cluster_id BIGINT NOT NULL,
 service_name VARCHAR NOT NULL,
 component_name VARCHAR NOT NULL,
 host_name VARCHAR NOT NULL references Hosts(host_name),
@@ -150,7 +150,7 @@ FOREIGN KEY (cluster_id, service_name) REFERENCES ClusterServices(cluster_id, se
 
 CREATE TABLE ServiceDesiredState
 (
-cluster_id INTEGER,
+cluster_id BIGINT,
 service_name VARCHAR DEFAULT '' NOT NULL,
 desired_state VARCHAR DEFAULT '' NOT NULL,
 desired_host_role_mapping INTEGER DEFAULT '0' NOT NULL,
@@ -161,7 +161,7 @@ FOREIGN KEY (cluster_id, service_name) REFERENCES ClusterServices(cluster_id, se
 
 CREATE TABLE HostComponentMapping /*HostRoleMapping*/
 (
-cluster_id INTEGER,
+cluster_id BIGINT,
 service_name VARCHAR DEFAULT '' NOT NULL,
 host_component_mapping_id SERIAL /*INTEGER NOT NULL AUTO_INCREMENT*/,
 host_component_mapping_snapshot VARCHAR DEFAULT '' NOT NULL,
@@ -172,7 +172,7 @@ FOREIGN KEY (cluster_id, service_name) REFERENCES ClusterServices(cluster_id, se
 
 CREATE TABLE ClusterState
 (
-cluster_id INTEGER NOT NULL references Clusters(cluster_id),
+cluster_id BIGINT NOT NULL references Clusters(cluster_id),
 current_cluster_state VARCHAR DEFAULT '' NOT NULL,
 PRIMARY KEY (cluster_id)
 );
@@ -192,7 +192,7 @@ PRIMARY KEY (host_name)
 
 CREATE TABLE ServiceComponentDesiredState
 (
-cluster_id INTEGER references Clusters(cluster_id),
+cluster_id BIGINT references Clusters(cluster_id),
 service_name VARCHAR DEFAULT '' NOT NULL,
 component_name VARCHAR DEFAULT '' NOT NULL,
 desired_state VARCHAR DEFAULT '' NOT NULL,
@@ -204,7 +204,7 @@ FOREIGN KEY (cluster_id, service_name) REFERENCES ClusterServices(cluster_id, se
 
 CREATE TABLE HostComponentState
 (
-cluster_id INTEGER,
+cluster_id BIGINT,
 service_name VARCHAR DEFAULT '' NOT NULL,
 host_name VARCHAR DEFAULT '' NOT NULL references Hosts(host_name),
 component_name VARCHAR DEFAULT '' NOT NULL,
@@ -217,7 +217,7 @@ FOREIGN KEY (cluster_id, service_name, component_name) REFERENCES ServiceCompone
 
 CREATE TABLE HostComponentDesiredState
 (
-cluster_id INTEGER,
+cluster_id BIGINT,
 service_name VARCHAR DEFAULT '' NOT NULL,
 host_name VARCHAR NOT NULL references Hosts(host_name),
 component_name VARCHAR DEFAULT '' NOT NULL,
@@ -230,18 +230,18 @@ FOREIGN KEY (cluster_id, service_name, component_name) REFERENCES ServiceCompone
 
 CREATE TABLE STAGE
 (
-   cluster_id INTEGER references Clusters(cluster_id),
-   request_id SERIAL,
-   stage_id INTEGER DEFAULT '0' NOT NULL,
+   cluster_id BIGINT references Clusters(cluster_id),
+   request_id BIGINT DEFAULT '0',
+   stage_id BIGINT DEFAULT '0' NOT NULL,
    log_info VARCHAR DEFAULT '' NOT NULL,
    PRIMARY KEY (request_id, stage_id)
 );
 
 CREATE TABLE HOST_ROLE_COMMAND
 (
-   task_id INTEGER DEFAULT '0' NOT NULL,
-   request_id INTEGER NOT NULL,
-   stage_id INTEGER NOT NULL,
+   task_id SERIAL NOT NULL,
+   request_id BIGINT NOT NULL,
+   stage_id BIGINT NOT NULL,
    host_name VARCHAR DEFAULT '' NOT NULL references Hosts(host_name),
    role VARCHAR DEFAULT '' NOT NULL,
    command VARCHAR DEFAULT '' NOT NULL,
@@ -252,7 +252,7 @@ CREATE TABLE HOST_ROLE_COMMAND
    std_out VARCHAR DEFAULT '' NOT NULL,
    start_time BIGINT DEFAULT -1 NOT NULL,
    last_attempt_time BIGINT DEFAULT -1 NOT NULL,
-   attempt_count INTEGER DEFAULT 0 NOT NULL,
+   attempt_count SMALLINT DEFAULT 0 NOT NULL,
    PRIMARY KEY (task_id),
    FOREIGN KEY (request_id, stage_id) REFERENCES STAGE(request_id, stage_id)
 );
@@ -260,15 +260,15 @@ CREATE TABLE HOST_ROLE_COMMAND
 CREATE TABLE EXECUTION_COMMAND
 (
    task_id INTEGER DEFAULT '0' NOT NULL references HOST_ROLE_COMMAND(task_id),
-   command bytea NOT NULL, /** Serialized ExecutionCommand **/
+   command VARCHAR NOT NULL, /** Serialized ExecutionCommand **/
    PRIMARY KEY(task_id)
 );
 
 
 CREATE TABLE ROLE_SUCCESS_CRITERIA
 (
-   request_id INTEGER NOT NULL,
-   stage_id INTEGER NOT NULL,
+   request_id BIGINT NOT NULL,
+   stage_id BIGINT NOT NULL,
    role VARCHAR DEFAULT '' NOT NULL,
    success_factor FLOAT DEFAULT 1,
    PRIMARY KEY(role, request_id, stage_id),
@@ -292,6 +292,7 @@ CREATE TABLE ROLE_SUCCESS_CRITERIA
 
 
 GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA ambari TO "ambari-server";
+GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA ambari TO "ambari-server";
 
 BEGIN;
 

+ 1 - 2
ambari-server/src/main/resources/META-INF/persistence.xml

@@ -77,8 +77,7 @@
       <property name="eclipselink.ddl-generation" value="drop-and-create-tables"/>
       <!--<property name="javax.persistence.jdbc.user" value="ambari-server"/>-->
       <!--<property name="javax.persistence.jdbc.password" value="bigdata"/>-->
+      <property name="eclipselink.orm.throw.exceptions" value="true"/>
     </properties>
   </persistence-unit>
-
-
 </persistence>

+ 4 - 0
ambari-server/src/main/resources/db/newcerts/.gitignore

@@ -0,0 +1,4 @@
+# Ignore everything in this directory
+*
+# Except this file
+!.gitignore

+ 12 - 0
ambari-server/src/main/resources/stacks/HDP/0.1/repos/repoinfo.xml

@@ -0,0 +1,12 @@
+<?xml version="1.0"?>
+<reposinfo>
+    <repo>
+        <url>url</url>
+        <os>Centos 5</os>
+        <description>...</description>
+    </repo>
+    <repo>
+        <url>url</url>
+        <os>Centos 6</os>
+    </repo>
+</reposinfo>

+ 6 - 0
ambari-server/src/main/resources/stacks/HDP/0.1/services/HDFS/metainfo.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0"?>
+<metainfo>
+    <user>root</user>
+    <comment>This is comment for HDFS service</comment>
+    <version>1.0</version>
+</metainfo>

+ 144 - 0
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java

@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.actionmanager;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestActionDBAccessorImpl {
+  private static final Logger log = LoggerFactory.getLogger(TestActionDBAccessorImpl.class);
+
+  private long requestId = 23;
+  private long stageId = 31;
+  private String hostName = "host1";
+  private String clusterName = "cluster1";
+  private Injector injector;
+  ActionDBAccessor db;
+  ActionManager am;
+
+  @Inject
+  private Clusters clusters;
+  @Inject
+  private ExecutionCommandDAO executionCommandDAO;
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Before
+  public void setup() throws AmbariException {
+    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector.getInstance(GuiceJpaInitializer.class);
+    injector.injectMembers(this);
+    clusters.addHost(hostName);
+    clusters.getHost(hostName).persist();
+    clusters.addCluster(clusterName);
+    db = injector.getInstance(ActionDBAccessorImpl.class);
+    am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db);
+  }
+
+  @After
+  public void tearDown() throws AmbariException {
+    injector.getInstance(PersistService.class).stop();
+  }
+
+  @Test
+  public void testActionResponse() {
+    String hostname = "host1";
+    populateActionDB(db, hostname);
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setRole("HBASE_MASTER");
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    reports.add(cr);
+    am.actionResponse(hostname, reports);
+    assertEquals(215,
+        am.getAction(requestId, stageId).getExitCode(hostname, "HBASE_MASTER"));
+    assertEquals(HostRoleStatus.COMPLETED, am.getAction(requestId, stageId)
+        .getHostRoleStatus(hostname, "HBASE_MASTER"));
+  }
+
+  @Test
+  public void testPersistActions() {
+    populateActionDB(db, hostName);
+    for (Stage stage : db.getAllStages(requestId)) {
+      log.info("taskId={}", stage.getExecutionCommands(hostName).get(0).getTaskId());
+      assertTrue(stage.getExecutionCommands(hostName).get(0).getTaskId() != -1);
+      log.info(executionCommandDAO.findByPK(stage.getExecutionCommands(hostName).get(0).getTaskId()).getCommand());
+    }
+  }
+
+  @Test
+  public void testHostRoleScheduled() {
+    populateActionDB(db, hostName);
+    Stage stage = db.getAction(StageUtils.getActionId(requestId, stageId));
+    assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(hostName, Role.HBASE_MASTER.toString()));
+    List<HostRoleCommandEntity> entities=
+        hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+    stage.setHostRoleStatus(hostName, Role.HBASE_MASTER.toString(), HostRoleStatus.QUEUED);
+
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+    assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(hostName, Role.HBASE_MASTER.toString()));
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+    db.hostRoleScheduled(stage, hostName, Role.HBASE_MASTER.toString());
+
+    entities = hostRoleCommandDAO.findByHostRole(hostName, requestId, stageId, Role.HBASE_MASTER);
+    assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
+  }
+
+  private void populateActionDB(ActionDBAccessor db, String hostname) {
+    Stage s = new Stage(requestId, "/a/b", "cluster1");
+    s.setStageId(stageId);
+    s.addHostRoleExecutionCommand(hostname, Role.HBASE_MASTER,
+        RoleCommand.START,
+        new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+            hostname, System.currentTimeMillis()), "cluster1", "HBASE");
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    db.persistActions(stages);
+  }
+}

+ 50 - 12
ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java

@@ -23,15 +23,14 @@ import com.google.inject.Injector;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.persist.Transactional;
-import org.apache.ambari.server.orm.dao.ClusterDAO;
-import org.apache.ambari.server.orm.dao.RoleDAO;
-import org.apache.ambari.server.orm.dao.UserDAO;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.orm.dao.*;
 import org.apache.ambari.server.orm.entities.*;
 import org.apache.ambari.server.state.HostState;
 import org.springframework.security.crypto.password.PasswordEncoder;
 
 import javax.persistence.EntityManager;
-import javax.persistence.Query;
 import java.util.*;
 
 @Singleton
@@ -143,15 +142,54 @@ public class OrmTestHelper {
     getEntityManager().getTransaction().setRollbackOnly();
   }
 
-  public int getClusterSizeByHostName(String hostName) {
-
-    Query query = getEntityManager().createQuery(
-            "SELECT host2 from HostEntity host join host.clusterEntities clusters join clusters.hostEntities host2 where host.hostName=:hostName");
-    query.setParameter("hostName", hostName);
-
-    Collection hosts = query.getResultList();
+  @Transactional
+  public void createStageCommands() {
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    StageDAO stageDAO = injector.getInstance(StageDAO.class);
+    HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
+    HostDAO hostDAO = injector.getInstance(HostDAO.class);
+    StageEntity stageEntity = new StageEntity();
+    stageEntity.setCluster(clusterDAO.findByName("test_cluster1"));
+    stageEntity.setRequestId(0L);
+    stageEntity.setStageId(0L);
+
+    HostRoleCommandEntity commandEntity = new HostRoleCommandEntity();
+    HostRoleCommandEntity commandEntity2 = new HostRoleCommandEntity();
+    HostRoleCommandEntity commandEntity3 = new HostRoleCommandEntity();
+    HostEntity host1 = hostDAO.findByName("test_host1");
+    HostEntity host2 = hostDAO.findByName("test_host2");
+    commandEntity.setHost(host1);
+    host1.getHostRoleCommandEntities().add(commandEntity);
+    commandEntity.setHostName("test_host1");
+    commandEntity.setCommand("cmd1");
+    commandEntity.setStatus(HostRoleStatus.QUEUED);
+    commandEntity.setRole(Role.DATANODE);
+    commandEntity2.setHost(host2);
+    host2.getHostRoleCommandEntities().add(commandEntity2);
+    commandEntity2.setCommand("cmd2");
+    commandEntity2.setRole(Role.NAMENODE);
+    commandEntity2.setStatus(HostRoleStatus.COMPLETED);
+    commandEntity3.setHost(host1);
+    host1.getHostRoleCommandEntities().add(commandEntity3);
+    commandEntity3.setCommand("cmd3");
+    commandEntity3.setRole(Role.SECONDARY_NAMENODE);
+    commandEntity3.setStatus(HostRoleStatus.IN_PROGRESS);
+    commandEntity.setStage(stageEntity);
+    commandEntity2.setStage(stageEntity);
+    commandEntity3.setStage(stageEntity);
+
+    stageEntity.setHostRoleCommands(new ArrayList<HostRoleCommandEntity>());
+    stageEntity.getHostRoleCommands().add(commandEntity);
+    stageEntity.getHostRoleCommands().add(commandEntity2);
+    stageEntity.getHostRoleCommands().add(commandEntity3);
+
+    stageDAO.create(stageEntity);
+    hostRoleCommandDAO.create(commandEntity3);
+    hostRoleCommandDAO.create(commandEntity);
+    hostRoleCommandDAO.create(commandEntity2);
+    hostDAO.merge(host1);
+    hostDAO.merge(host2);
 
-    return hosts.size();
   }
 
 }

+ 68 - 18
ambari-server/src/test/java/org/apache/ambari/server/orm/TestOrmImpl.java

@@ -20,38 +20,39 @@ package org.apache.ambari.server.orm;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import com.google.inject.persist.jpa.JpaPersistModule;
-import org.apache.ambari.server.orm.dao.ClusterDAO;
-import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
-import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
-import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
-import org.apache.ambari.server.orm.entities.ServiceConfigEntity;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import com.google.inject.persist.PersistService;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.orm.dao.*;
+import org.apache.ambari.server.orm.entities.*;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.persistence.EntityManager;
 import javax.persistence.RollbackException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 
 public class TestOrmImpl extends Assert {
-  private final static Log log = LogFactory.getLog(TestOrmImpl.class);
+  private static final Logger log = LoggerFactory.getLogger(TestOrmImpl.class);
 
   private static Injector injector;
 
-  @BeforeClass
-  public static void setUpClass() throws Exception {
-    injector = Guice.createInjector(new JpaPersistModule("ambari-javadb")); //used for injecting in-memory DB EntityManager
-//    injector = Guice.createInjector(new JpaPersistModule("ambari-postgres")); //for injecting
-    injector.getInstance(GuiceJpaInitializer.class); //needed by Guice-persist to work
+  @Before
+  public void setup() {
+    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector.getInstance(GuiceJpaInitializer.class);
     injector.getInstance(OrmTestHelper.class).createDefaultData();
   }
 
+  @After
+  public void teardown() {
+    injector.getInstance(PersistService.class).stop();
+  }
+
   /**
    * persistence provider is responsible for returning empty collection if relation doesn't exists
    */
@@ -184,4 +185,53 @@ public class TestOrmImpl extends Assert {
     clusterServiceDAO.remove(clusterServiceEntity);
   }
 
+  @Test
+  public void testSortedCommands() {
+    injector.getInstance(OrmTestHelper.class).createStageCommands();
+    HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
+    HostDAO hostDAO = injector.getInstance(HostDAO.class);
+    StageDAO stageDAO = injector.getInstance(StageDAO.class);
+
+    List<HostRoleCommandEntity> list =
+        hostRoleCommandDAO.findSortedCommandsByStageAndHost(
+            stageDAO.findByActionId("0-0"), hostDAO.findByName("test_host1"));
+    log.info("command '{}' - taskId '{}'", list.get(0).getCommand(), list.get(0).getTaskId());
+    log.info("command '{}' - taskId '{}'", list.get(1).getCommand(), list.get(1).getTaskId());
+    assertTrue(list.get(0).getTaskId() < list.get(1).getTaskId());
+  }
+
+  @Test
+  public void testFindHostsByStage() {
+    injector.getInstance(OrmTestHelper.class).createStageCommands();
+    HostDAO hostDAO = injector.getInstance(HostDAO.class);
+    StageDAO stageDAO = injector.getInstance(StageDAO.class);
+    StageEntity stageEntity = stageDAO.findByActionId("0-0");
+    log.info("StageEntity {} {}", stageEntity.getRequestId(), stageEntity.getStageId());
+    List<HostEntity> hosts = hostDAO.findByStage(stageEntity);
+    assertEquals(2, hosts.size());
+  }
+
+  @Test
+  public void testAbortHostRoleCommands() {
+    injector.getInstance(OrmTestHelper.class).createStageCommands();
+    HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
+    int result = hostRoleCommandDAO.updateStatusByRequestId(0L, HostRoleStatus.ABORTED, Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING));
+    assertEquals(2, result);
+  }
+
+  @Test
+  public void testFindStageByHostRole() {
+    injector.getInstance(OrmTestHelper.class).createStageCommands();
+    HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
+    List<HostRoleCommandEntity> list = hostRoleCommandDAO.findByHostRole("test_host1", 0L, 0L, Role.DATANODE);
+    assertEquals(1, list.size());
+  }
+
+  @Test
+  public void testLastRequestId() {
+    injector.getInstance(OrmTestHelper.class).createStageCommands();
+    StageDAO stageDAO = injector.getInstance(StageDAO.class);
+    assertEquals(0L, stageDAO.getLastRequestId());
+  }
+
 }