Browse Source

AMBARI-8346. Upgrade Execute: define db schema to store upgrade artifacts (ncole)

Nate Cole 10 năm trước cách đây
mục cha
commit
a496f8bc10
27 tập tin đã thay đổi với 1016 bổ sung98 xóa
  1. 9 4
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeItemResourceProvider.java
  2. 306 22
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
  3. 35 15
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java
  4. 79 6
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java
  5. 105 8
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java
  6. 5 0
      ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
  7. 9 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Batch.java
  8. 9 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConditionalBatch.java
  9. 8 3
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java
  10. 26 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/CountBatch.java
  11. 5 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
  12. 5 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java
  13. 29 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/PercentBatch.java
  14. 8 4
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
  15. 25 0
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java
  16. 22 0
      ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
  17. 23 0
      ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
  18. 25 1
      ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
  19. 28 1
      ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql
  20. 25 1
      ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
  21. 6 0
      ambari-server/src/main/resources/Ambari-DDL-SQLServer-DROP.sql
  22. 2 0
      ambari-server/src/main/resources/META-INF/persistence.xml
  23. 22 8
      ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
  24. 182 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
  25. 5 20
      ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java
  26. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/state/stack/UpgradePackTest.java
  27. 12 0
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java

+ 9 - 4
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeItemResourceProvider.java

@@ -18,9 +18,9 @@
 package org.apache.ambari.server.controller.internal;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -52,6 +52,7 @@ public class UpgradeItemResourceProvider extends AbstractControllerResourceProvi
   protected static final String UPGRADE_ID = "UpgradeItem/upgrade_id";
   protected static final String UPGRADE_ITEM_ID = "UpgradeItem/id";
   protected static final String UPGRADE_ITEM_STATE = "UpgradeItem/state";
+  protected static final String UPGRADE_ITEM_TEXT = "UpgradeItem/text";
 
   private static final Set<String> PK_PROPERTY_IDS = new HashSet<String>(
       Arrays.asList(UPGRADE_ID, UPGRADE_ITEM_ID));
@@ -69,6 +70,7 @@ public class UpgradeItemResourceProvider extends AbstractControllerResourceProvi
     PROPERTY_IDS.add(UPGRADE_ID);
     PROPERTY_IDS.add(UPGRADE_ITEM_ID);
     PROPERTY_IDS.add(UPGRADE_ITEM_STATE);
+    PROPERTY_IDS.add(UPGRADE_ITEM_TEXT);
 
     // keys
     KEY_PROPERTY_IDS.put(Resource.Type.UpgradeItem, UPGRADE_ITEM_ID);
@@ -114,9 +116,11 @@ public class UpgradeItemResourceProvider extends AbstractControllerResourceProvi
         throw new NoSuchResourceException(String.format("Cannot load upgrade for %s", upgradeIdStr));
       }
 
-      List<UpgradeItemEntity> items = upgrade.getUpgradeItems();
-      for (UpgradeItemEntity entity : items) {
-        results.add(toResource(entity, upgradeId, requestPropertyIds));
+      Collection<UpgradeItemEntity> items = upgrade.getUpgradeItems();
+      if (null != items) {
+        for (UpgradeItemEntity entity : items) {
+          results.add(toResource(entity, upgradeId, requestPropertyIds));
+        }
       }
     }
 
@@ -150,6 +154,7 @@ public class UpgradeItemResourceProvider extends AbstractControllerResourceProvi
     setResourceProperty(resource, UPGRADE_ID, Long.valueOf(upgradeId), requestedIds);
     setResourceProperty(resource, UPGRADE_ITEM_ID, entity.getId(), requestedIds);
     setResourceProperty(resource, UPGRADE_ITEM_STATE, entity.getState(), requestedIds);
+    setResourceProperty(resource, UPGRADE_ITEM_TEXT, entity.getText(), requestedIds);
 
     return resource;
   }

+ 306 - 22
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java

@@ -19,14 +19,19 @@ package org.apache.ambari.server.controller.internal;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.StaticallyInject;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.RequestFactory;
+import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
@@ -38,12 +43,22 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
 import org.apache.ambari.server.orm.dao.UpgradeDAO;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.UpgradeState;
-
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.stack.UpgradePack;
+import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
+import org.apache.ambari.server.state.stack.upgrade.CountBatch;
+import org.apache.ambari.server.state.stack.upgrade.PercentBatch;
+import org.apache.ambari.server.state.stack.upgrade.Task;
+
+import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 
@@ -64,9 +79,16 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
   private static final Map<Resource.Type, String> KEY_PROPERTY_IDS = new HashMap<Resource.Type, String>();
 
   @Inject
-  private static UpgradeDAO m_dao = null;
+  private static UpgradeDAO m_upgradeDAO = null;
   @Inject
   private static Provider<AmbariMetaInfo> m_metaProvider = null;
+  @Inject
+  private static RepositoryVersionDAO m_repoVersionDAO = null;
+  @Inject
+  private RequestFactory requestFactory;
+  @Inject
+  private StageFactory stageFactory;
+
 
   static {
     // properties
@@ -94,30 +116,26 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       UnsupportedPropertyException, ResourceAlreadyExistsException,
       NoSuchParentResourceException {
 
+    Set<Map<String, Object>> requestMaps = request.getProperties();
 
-    UpgradeEntity entity = new UpgradeEntity();
-
-    m_dao.create(entity);
-
-    List<UpgradeItemEntity> items = new ArrayList<UpgradeItemEntity>();
-    UpgradeItemEntity item = new UpgradeItemEntity();
-    item.setId(Long.valueOf(entity.getId().longValue() + 1000));
-    item.setState(UpgradeState.IN_PROGRESS);
-    items.add(item);
+    if (requestMaps.size() > 1) {
+      throw new SystemException("Can only initiate one upgrade per request.");
+    }
 
-    item = new UpgradeItemEntity();
-    item.setId(Long.valueOf(entity.getId().longValue() + 1001));
-    item.setState(UpgradeState.PENDING);
-    items.add(item);
+    for (final Map<String, Object> requestMap : requestMaps) {
+      createResources(new Command<Void>() {
+        @Override
+        public Void invoke() throws AmbariException {
+          UpgradePack up = validateRequest(requestMap);
 
+          createUpgrade(up, requestMap);
 
-    entity.setUpgradeItems(items);
-
+          return null;
+        };
+      });
+    }
 
-    /*
     notifyCreate(Resource.Type.Upgrade, request);
-    */
-
     return getRequestStatus(null);
   }
 
@@ -143,7 +161,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
         throw new NoSuchResourceException(String.format("Cluster %s could not be loaded", clusterName));
       }
 
-      List<UpgradeEntity> upgrades = m_dao.findUpgrades(cluster.getClusterId());
+      List<UpgradeEntity> upgrades = m_upgradeDAO.findUpgrades(cluster.getClusterId());
 
       for (UpgradeEntity entity : upgrades) {
         results.add(toResource(entity, clusterName, requestPropertyIds));
@@ -193,4 +211,270 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     return resource;
   }
 
+  private UpgradePack validateRequest(Map<String, Object> requestMap) throws AmbariException {
+    String clusterName = (String) requestMap.get(UPGRADE_CLUSTER_NAME);
+    String version = (String) requestMap.get(UPGRADE_VERSION);
+
+    if (null == clusterName) {
+      throw new AmbariException(String.format("%s is required", UPGRADE_CLUSTER_NAME));
+    }
+
+    if (null == version) {
+      throw new AmbariException(String.format("%s is required", UPGRADE_VERSION));
+    }
+
+    Cluster cluster = getManagementController().getClusters().getCluster(clusterName);
+    StackId stack = cluster.getDesiredStackVersion();
+    RepositoryVersionEntity versionEntity = m_repoVersionDAO.findByStackAndVersion(
+        stack.getStackId(), version);
+
+    if (null == versionEntity) {
+      throw new AmbariException(String.format("Version %s for stack %s was not found",
+          version, stack.getStackVersion()));
+    }
+
+    Map<String, UpgradePack> packs = m_metaProvider.get().getUpgradePacks(
+        stack.getStackName(), stack.getStackVersion());
+
+    UpgradePack up = packs.get(versionEntity.getUpgradePackage());
+
+    if (null == up) {
+      throw new AmbariException(String.format(
+          "Upgrade pack %s not found", versionEntity.getUpgradePackage()));
+    }
+
+    // !!! validate all hosts have the version installed
+
+    return up;
+  }
+
+  private UpgradeEntity createUpgrade(UpgradePack pack, Map<String, Object> requestMap)
+    throws AmbariException {
+
+    String clusterName = (String) requestMap.get(UPGRADE_CLUSTER_NAME);
+
+    if (null == clusterName) {
+      throw new AmbariException(String.format("%s is required", UPGRADE_CLUSTER_NAME));
+    }
+
+    Cluster cluster = getManagementController().getClusters().getCluster(clusterName);
+    Map<String, Service> clusterServices = cluster.getServices();
+
+    Map<String, Map<String, ProcessingComponent>> tasks = pack.getTasks();
+
+    List<StageHolder> preUpgrades = new ArrayList<StageHolder>();
+    List<StageHolder> restart = new ArrayList<StageHolder>();
+    List<StageHolder> postUpgrades = new ArrayList<StageHolder>();
+
+    for (Entry<String, List<String>> entry : pack.getOrder().entrySet()) {
+      String serviceName = entry.getKey();
+      List<String> componentNames = entry.getValue();
+
+      // !!! if the upgrade pack doesn't define any tasks, skip
+      if (!tasks.containsKey(serviceName)) {
+        continue;
+      }
+
+      // !!! if the service isn't installed, skip
+      if (!clusterServices.containsKey(serviceName)) {
+        continue;
+      }
+
+      Service service = clusterServices.get(serviceName);
+      Map<String, ServiceComponent> components = service.getServiceComponents();
+
+      for (String componentName : componentNames) {
+        // !!! if the upgrade pack has no tasks for component, skip
+        if (!tasks.get(serviceName).containsKey(componentName)) {
+          continue;
+        }
+
+        // !!! if the component is not installed with the cluster, skip
+        if (!components.containsKey(componentName)) {
+          continue;
+        }
+
+        ProcessingComponent pc = tasks.get(serviceName).get(componentName);
+
+        List<Set<String>> groupings = computeHostGroupings(pc,
+            components.get(componentName).getServiceComponentHosts().keySet());
+
+        preUpgrades.addAll(buildUpgradeStages(pc, true, groupings));
+        restart.addAll(buildRollingRestart(pc, groupings));
+        postUpgrades.addAll(buildUpgradeStages(pc, false, groupings));
+      }
+    }
+
+
+    Gson gson = new Gson();
+
+    UpgradeEntity entity = new UpgradeEntity();
+
+    List<UpgradeItemEntity> items = new ArrayList<UpgradeItemEntity>();
+    for (StageHolder holder : preUpgrades) {
+      holder.upgradeItemEntity.setHosts(gson.toJson(holder.hosts));
+      holder.upgradeItemEntity.setTasks(gson.toJson(holder.taskHolder.tasks));
+      items.add(holder.upgradeItemEntity);
+    }
+
+    for (StageHolder holder : restart) {
+      holder.upgradeItemEntity.setHosts(gson.toJson(holder.hosts));
+      items.add(holder.upgradeItemEntity);
+    }
+
+    for (StageHolder holder : postUpgrades) {
+      holder.upgradeItemEntity.setHosts(gson.toJson(holder.hosts));
+      holder.upgradeItemEntity.setTasks(gson.toJson(holder.taskHolder.tasks));
+      items.add(holder.upgradeItemEntity);
+    }
+
+    entity.setClusterId(Long.valueOf(cluster.getClusterId()));
+    entity.setUpgradeItems(items);
+
+    m_upgradeDAO.create(entity);
+
+//    RequestStageContainer req = createRequest();
+//
+//    req.getRequestStatusResponse();
+
+//  TODO req.persist();
+    return entity;
+  }
+
+  private List<StageHolder> buildUpgradeStages(ProcessingComponent pc,
+      boolean preUpgrade, List<Set<String>> hostGroups) {
+
+    List<TaskHolder> taskHolders = buildStageStrategy(
+        preUpgrade ? pc.preTasks : pc.postTasks);
+
+    List<StageHolder> stages = new ArrayList<StageHolder>();
+
+    StringBuilder sb = new StringBuilder(preUpgrade ? "Preparing " : "Finalizing ");
+    sb.append("%s on %d host(s).  Phase %s/%s");
+    String textFormat = sb.toString();
+
+    for (TaskHolder taskHolder : taskHolders) {
+      int i = 1;
+      for (Set<String> hostGroup : hostGroups) {
+        StageHolder stage = new StageHolder();
+        stage.hosts = hostGroup;
+        stage.taskHolder = taskHolder;
+        stage.upgradeItemEntity = new UpgradeItemEntity();
+        stage.upgradeItemEntity.setText(String.format(textFormat,
+            pc.name,
+            Integer.valueOf(hostGroup.size()),
+            Integer.valueOf(i++),
+            Integer.valueOf(hostGroups.size())));
+        stages.add(stage);
+      }
+    }
+
+    return stages;
+  }
+
+  /**
+   * Builds the stages for the rolling restart portion
+   * @param pc the information from the upgrade pack
+   * @param hostGroups a list of the host groupings
+   * @return the list of stages that need to be created
+   */
+  private List<StageHolder> buildRollingRestart(ProcessingComponent pc, List<Set<String>> hostGroups) {
+    List<StageHolder> stages = new ArrayList<StageHolder>();
+
+    String textFormat = "Restarting %s on %d host(s), Phase %d/%d";
+
+    int i = 1;
+    for (Set<String> hostGroup : hostGroups) {
+      // !!! each of these is its own stage
+      StageHolder stage = new StageHolder();
+      stage.hosts = hostGroup;
+      stage.upgradeItemEntity = new UpgradeItemEntity();
+      stage.upgradeItemEntity.setText(String.format(textFormat, pc.name,
+          Integer.valueOf(hostGroup.size()),
+          Integer.valueOf(i++),
+          Integer.valueOf(hostGroups.size())));
+      stages.add(stage);
+    }
+
+    return stages;
+  }
+
+
+  /**
+   * Calculates how the hosts will be executing their upgrades.
+   */
+  private List<Set<String>> computeHostGroupings(ProcessingComponent taskBuckets, Set<String> allHosts) {
+    if (null == taskBuckets.batch) {
+      return Collections.singletonList(allHosts);
+    } else {
+      return taskBuckets.batch.getHostGroupings(allHosts);
+    }
+  }
+
+  /**
+   * For all the tasks for a component, separate out the manual from the
+   * automated steps into the stages they should executed.
+   *
+   * @param tasks a list of tasks
+   * @return the list of stages
+   */
+  private List<TaskHolder> buildStageStrategy(List<Task> tasks) {
+    if (null == tasks)
+      return Collections.emptyList();
+
+    List<TaskHolder> holders = new ArrayList<TaskHolder>();
+    TaskHolder holder = new TaskHolder();
+
+    holders.add(holder);
+    int i = 0;
+    for (Task t : tasks) {
+      // !!! TODO should every manual task get its own stage?
+      if (i > 0 && t.getType().isManual() != tasks.get(i-1).getType().isManual()) {
+        holder = new TaskHolder();
+        holders.add(holder);
+      }
+
+      holder.tasks.add(t);
+      i++;
+    }
+
+    return holders;
+  }
+
+  private static class TaskHolder {
+    private List<Task> tasks = new ArrayList<Task>();
+  }
+
+  private static class StageHolder {
+    private TaskHolder taskHolder;
+    private UpgradeItemEntity upgradeItemEntity;
+    private Set<String> hosts;
+
+    @Override
+    public String toString() {
+      // TODO Auto-generated method stub
+      return upgradeItemEntity.toString();
+    }
+  }
+
+
+  private RequestStageContainer createRequest() {
+
+    ActionManager actionManager = getManagementController().getActionManager();
+
+    RequestStageContainer requestStages = new RequestStageContainer(
+        actionManager.getNextRequestId(), null, requestFactory, actionManager);
+
+    /*
+    List<Stage> stages = doStageCreation(requestStages, cluster, changedServices, changedComponents,
+        changedHosts, requestParameters, requestProperties,
+        runSmokeTest, reconfigureClients);
+    LOG.debug("Created {} stages", ((stages != null) ? stages.size() : 0));
+
+    requestStages.addStages(stages);
+    updateServiceStates(changedServices, changedComponents, changedHosts, ignoredHosts);
+    */
+    return requestStages;
+  }
+
 }

+ 35 - 15
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/UpgradeDAO.java

@@ -17,12 +17,12 @@
  */
 package org.apache.ambari.server.orm.dao;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.persistence.EntityManager;
+import javax.persistence.TypedQuery;
 
+import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
 
 import com.google.inject.Inject;
@@ -37,18 +37,23 @@ import com.google.inject.persist.Transactional;
 public class UpgradeDAO {
 
   @Inject
-  private Provider<EntityManager> m_entityManagerProvider;
+  private Provider<EntityManager> entityManagerProvider;
 
-  // !!! TODO tie into JPA framework.  For now, just return skeleton classes
-  private AtomicLong m_idGen = new AtomicLong(1L);
-  private List<UpgradeEntity> m_entities = new ArrayList<UpgradeEntity>();
+  @Inject
+  private DaoUtils daoUtils;
 
   /**
    * @param clusterId the cluster id
    * @return the list of upgrades initiated for the cluster
    */
+  @RequiresSession
   public List<UpgradeEntity> findUpgrades(long clusterId) {
-    return m_entities;
+    TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery(
+        "UpgradeEntity.findAllForCluster", UpgradeEntity.class);
+
+    query.setParameter("clusterId", Long.valueOf(clusterId));
+
+    return daoUtils.selectList(query);
   }
 
   /**
@@ -56,22 +61,37 @@ public class UpgradeDAO {
    * @param upgradeId the id
    * @return the entity, or {@code null} if not found
    */
+  @RequiresSession
   public UpgradeEntity findUpgrade(long upgradeId) {
-    for (UpgradeEntity ue : m_entities) {
-      if (ue.getId().longValue() == upgradeId) {
-        return ue;
-      }
-    }
-    return null;
+    TypedQuery<UpgradeEntity> query = entityManagerProvider.get().createNamedQuery(
+        "UpgradeEntity.findUpgrade", UpgradeEntity.class);
+
+    query.setParameter("upgradeId", Long.valueOf(upgradeId));
+
+    return daoUtils.selectSingle(query);
   }
 
   /**
    * Creates the upgrade entity in the database
+   * @param entity the entity
    */
   @Transactional
   public void create(UpgradeEntity entity) {
-    entity.setId(Long.valueOf(m_idGen.getAndIncrement()));
-    m_entities.add(entity);
+    entityManagerProvider.get().persist(entity);
+  }
+
+  /**
+   * Removes all upgrades associated with the cluster.
+   * @param clusterId the cluster id
+   */
+  @Transactional
+  public void removeAll(long clusterId) {
+    List<UpgradeEntity> entities = findUpgrades(clusterId);
+
+    for (UpgradeEntity entity : entities) {
+      entityManagerProvider.get().remove(entity);
+    }
+
   }
 
 

+ 79 - 6
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java

@@ -17,42 +17,115 @@
  */
 package org.apache.ambari.server.orm.entities;
 
+import java.util.Collection;
 import java.util.List;
 
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import javax.persistence.TableGenerator;
+
+import org.apache.ambari.server.state.UpgradeState;
+
 /**
  * Models the data representation of an upgrade
  */
+@Table(name = "upgrade")
+@Entity
+@TableGenerator(name = "upgrade_id_generator",
+    table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value",
+    pkColumnValue = "upgrade_id_seq", initialValue = 0, allocationSize = 1)
+@NamedQueries({
+  @NamedQuery(name = "UpgradeEntity.findAllForCluster",
+      query = "SELECT u FROM UpgradeEntity u WHERE u.clusterId = :clusterId"),
+  @NamedQuery(name = "UpgradeEntity.findUpgrade",
+      query = "SELECT u FROM UpgradeEntity u WHERE u.upgradeId = :upgradeId"),
+})
 public class UpgradeEntity {
 
-  private Long m_id = null;
-  private List<UpgradeItemEntity> m_items;
+  @Id
+  @Column(name = "upgrade_id", nullable = false, insertable = true, updatable = false)
+  @GeneratedValue(strategy = GenerationType.TABLE, generator = "upgrade_id_generator")
+  private Long upgradeId;
+
+  @Column(name = "cluster_id", nullable = false, insertable = true, updatable = false)
+  private Long clusterId;
+
+  @Enumerated(value=EnumType.STRING)
+  @Column(name = "state", nullable = false)
+  private UpgradeState state = UpgradeState.NONE;
+
+  @OneToMany(mappedBy = "upgradeEntity", cascade = { CascadeType.ALL })
+  private List<UpgradeItemEntity> upgradeItemEntities;
+
 
   /**
    * @return the id
    */
   public Long getId() {
-    return m_id;
+    return upgradeId;
   }
 
   /**
    * @param id the id
    */
   public void setId(Long id) {
-    m_id = id;
+    upgradeId = id;
   }
 
+  /**
+   * @return the cluster id
+   */
+  public Long getClusterId() {
+    return clusterId;
+  }
+
+  /**
+   * @param id the cluster id
+   */
+  public void setClusterId(Long id) {
+    clusterId = id;
+  }
+
+  /**
+   * @return the current state
+   */
+  public UpgradeState getState() {
+    return state;
+  }
+
+  /**
+   * @param state the new state
+   */
+  public void setState(UpgradeState state) {
+    this.state = state;
+  }
+
+
   /**
    * @return the upgrade items
    */
   public List<UpgradeItemEntity> getUpgradeItems() {
-    return m_items;
+    return upgradeItemEntities;
   }
 
   /**
    * @param items the upgrade items
    */
   public void setUpgradeItems(List<UpgradeItemEntity> items) {
-    m_items = items;
+    for (UpgradeItemEntity entity : items) {
+      entity.setUpgradeEntity(this);
+    }
+    upgradeItemEntities = items;
   }
 
 }

+ 105 - 8
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java

@@ -17,44 +17,141 @@
  */
 package org.apache.ambari.server.orm.entities;
 
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
+import javax.persistence.TableGenerator;
+import javax.persistence.Transient;
+
 import org.apache.ambari.server.state.UpgradeState;
 
 /**
  * Models a single upgrade item as part of an entire {@link UpgradeEntity}
  */
+@Table(name = "upgrade_item")
+@Entity
+@TableGenerator(name = "upgrade_item_id_generator",
+    table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value",
+    pkColumnValue = "upgrade_item_id_seq", initialValue = 0, allocationSize = 1)
 public class UpgradeItemEntity {
 
-  private Long m_id = null;
-  private UpgradeState m_state = UpgradeState.NONE;
+  @Id
+  @Column(name = "upgrade_item_id", nullable = false, insertable = true, updatable = false)
+  @GeneratedValue(strategy = GenerationType.TABLE, generator = "upgrade_item_id_generator")
+  private Long upgradeItemId;
+
+  @Column(name = "upgrade_id", nullable = false, insertable = false, updatable = false)
+  private Long upgradeId;
+
+  @Enumerated(value=EnumType.STRING)
+  @Column(name = "state", length=255, nullable = false)
+  private UpgradeState state = UpgradeState.NONE;
+
+  @Basic
+  @Column(name = "hosts")
+  private String hosts = null;
+
+  @Basic
+  @Column(name = "tasks")
+  private String tasks = null;
+
+  @Basic
+  @Column(name = "item_text", length = 1024)
+  private String itemText = null;
+
+  @ManyToOne
+  @JoinColumn(name = "upgrade_id", referencedColumnName = "upgrade_id", nullable = false)
+  private UpgradeEntity upgradeEntity;
+
 
   /**
    * @return the id
    */
   public Long getId() {
-    return m_id;
+    return upgradeItemId;
   }
 
   /**
    * @param id the id
    */
   public void setId(Long id) {
-    m_id = id;
+    upgradeItemId = id;
+  }
+
+  /**
+   * @return the state
+   */
+  public UpgradeState getState() {
+    return state;
   }
 
   /**
    * @param state the state
    */
   public void setState(UpgradeState state) {
-    m_state = state;
+    this.state = state;
   }
 
+
   /**
-   * @return the state
+   * @return the tasks in json format
    */
-  public UpgradeState getState() {
-    return m_state;
+  public String getTasks() {
+    return tasks;
+  }
+
+  /**
+   * @param json the tasks in json format
+   */
+  public void setTasks(String json) {
+    tasks = json;
   }
 
+  /**
+   * @return the hosts in json format
+   */
+  public String getHosts() {
+    return hosts;
+  }
+
+  /**
+   * @param json the hosts in json format
+   */
+  public void setHosts(String json) {
+    hosts = json;
+  }
+
+  /**
+   * @return the item text
+   */
+  public String getText() {
+    return itemText;
+  }
+
+  /**
+   * @param text the item text
+   */
+  public void setText(String text) {
+    itemText = text;
+  }
+
+
+
+  public UpgradeEntity getUpgradeEntity() {
+    return upgradeEntity;
+  }
+
+  public void setUpgradeEntity(UpgradeEntity entity) {
+    upgradeEntity = entity;
+  }
 
 
 }

+ 5 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

@@ -57,6 +57,7 @@ import org.apache.ambari.server.orm.dao.ClusterStateDAO;
 import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO;
 import org.apache.ambari.server.orm.dao.HostConfigMappingDAO;
 import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
+import org.apache.ambari.server.orm.dao.UpgradeDAO;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
 import org.apache.ambari.server.orm.entities.ClusterConfigMappingEntity;
 import org.apache.ambari.server.orm.entities.ClusterEntity;
@@ -193,6 +194,9 @@ public class ClusterImpl implements Cluster {
   @Inject
   private AlertDispatchDAO alertDispatchDAO;
 
+  @Inject
+  private UpgradeDAO upgradeDAO;
+
   private volatile boolean svcHostsLoaded = false;
 
   private volatile Multimap<String, String> serviceConfigTypes;
@@ -1377,6 +1381,7 @@ public class ClusterImpl implements Cluster {
     long clusterId = getClusterId();
     alertDefinitionDAO.removeAll(clusterId);
     alertDispatchDAO.removeAllGroups(clusterId);
+    upgradeDAO.removeAll(clusterId);
     clusterDAO.removeByPK(clusterId);
   }
 

+ 9 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Batch.java

@@ -17,6 +17,9 @@
  */
 package org.apache.ambari.server.state.stack.upgrade;
 
+import java.util.List;
+import java.util.Set;
+
 import javax.xml.bind.annotation.XmlSeeAlso;
 import javax.xml.bind.annotation.XmlType;
 
@@ -48,4 +51,10 @@ public abstract class Batch {
      */
     CONDITIONAL
   }
+
+  /**
+   * @param hosts all the hosts
+   * @return a list of host sets defined by the specific batching
+   */
+  public abstract List<Set<String>> getHostGroupings(Set<String> hosts);
 }

+ 9 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConditionalBatch.java

@@ -17,6 +17,10 @@
  */
 package org.apache.ambari.server.state.stack.upgrade;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -43,4 +47,9 @@ public class ConditionalBatch extends Batch {
     return Batch.Type.CONDITIONAL;
   }
 
+  @Override
+  public List<Set<String>> getHostGroupings(Set<String> hosts) {
+    // TODO
+    return Collections.singletonList(hosts);
+  }
 }

+ 8 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java

@@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
 /**
@@ -31,8 +32,11 @@ import javax.xml.bind.annotation.XmlType;
 @XmlType(name="configure")
 public class ConfigureTask extends Task {
 
+  @XmlTransient
+  private Task.Type type = Task.Type.CONFIGURE;
+
   @XmlElement(name="type")
-  public String config;
+  public String configType;
 
   @XmlElement(name="key")
   public String key;
@@ -41,7 +45,8 @@ public class ConfigureTask extends Task {
   public String value;
 
   @Override
-  public Task.Type getType() {
-    return Task.Type.CONFIGURE;
+  public Type getType() {
+    return type;
   }
+
 }

+ 26 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/CountBatch.java

@@ -17,6 +17,11 @@
  */
 package org.apache.ambari.server.state.stack.upgrade;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -32,11 +37,31 @@ import javax.xml.bind.annotation.XmlType;
 public class CountBatch extends Batch {
 
   @XmlElement(name="count")
-  public int count = 0;
+  public int count = 1;
 
   @Override
   public Type getType() {
     return Batch.Type.COUNT;
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.ambari.server.state.stack.upgrade.Batch#getHostGroupings(java.util.Set)
+   */
+  @Override
+  public List<Set<String>> getHostGroupings(Set<String> hosts) {
+    List<Set<String>> groupings = new ArrayList<Set<String>>();
+
+    Set<String> set = new HashSet<String>();
+    groupings.add(set);
+    int i = 1;
+    for (String host : hosts) {
+      set.add(host);
+      if (i < hosts.size() && 0 == (i++ % count)) {
+        set = new HashSet<String>();
+        groupings.add(set);
+      }
+    }
+
+    return groupings;
+  }
 }

+ 5 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java

@@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
 /**
@@ -31,11 +32,14 @@ import javax.xml.bind.annotation.XmlType;
 @XmlType(name="execute")
 public class ExecuteTask extends Task {
 
+  @XmlTransient
+  private Task.Type type = Task.Type.EXECUTE;
+
   @XmlElement(name="command")
   public String command;
 
   @Override
   public Task.Type getType() {
-    return Task.Type.EXECUTE;
+    return type;
   }
 }

+ 5 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java

@@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
 /**
@@ -31,12 +32,15 @@ import javax.xml.bind.annotation.XmlType;
 @XmlType(name="manual")
 public class ManualTask extends Task {
 
+  @XmlTransient
+  private Task.Type type = Task.Type.MANUAL;
+
   @XmlElement(name="message")
   public String message;
 
   @Override
   public Task.Type getType() {
-    return Task.Type.MANUAL;
+    return type;
   }
 
 }

+ 29 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/PercentBatch.java

@@ -17,6 +17,11 @@
  */
 package org.apache.ambari.server.state.stack.upgrade;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -33,11 +38,34 @@ import javax.xml.bind.annotation.XmlType;
 public class PercentBatch extends Batch {
 
   @XmlElement(name="percent")
-  public int count = 0;
+  public int percent = 100;
 
   @Override
   public Type getType() {
     return Batch.Type.PERCENT;
   }
 
+  @Override
+  public List<Set<String>> getHostGroupings(Set<String> hosts) {
+
+    List<Set<String>> groupings = new ArrayList<Set<String>>();
+
+    int count = Double.valueOf(Math.ceil(
+        (double) percent / 100 * hosts.size())).intValue();
+
+    Set<String> set = new HashSet<String>();
+    groupings.add(set);
+    int i = 1;
+    for (String host : hosts) {
+      set.add(host);
+      if (i < hosts.size() && 0 == (i++ % count)) {
+        set = new HashSet<String>();
+        groupings.add(set);
+      }
+    }
+
+    return groupings;
+  }
+
+
 }

+ 8 - 4
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java

@@ -18,14 +18,12 @@
 package org.apache.ambari.server.state.stack.upgrade;
 
 import javax.xml.bind.annotation.XmlSeeAlso;
-import javax.xml.bind.annotation.XmlType;
 
 
 /**
  * Base class to identify the items that could possibly occur during an upgrade
  */
 @XmlSeeAlso(value={ExecuteTask.class, ConfigureTask.class, ManualTask.class})
-@XmlType
 public abstract class Task {
 
   /**
@@ -33,7 +31,6 @@ public abstract class Task {
    */
   public abstract Type getType();
 
-
   /**
    * Identifies the type of task.
    */
@@ -49,6 +46,13 @@ public abstract class Task {
     /**
      * Task that displays a message and must be confirmed before continuing
      */
-    MANUAL
+    MANUAL;
+
+    /**
+     * @return {@code true} if the task is manual or automated.
+     */
+    public boolean isManual() {
+      return this == MANUAL;
+    }
   }
 }

+ 25 - 0
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog200.java

@@ -26,6 +26,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.state.UpgradeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,6 +154,30 @@ public class UpgradeCatalog200 extends AbstractUpgradeCatalog {
     // New sequences
     dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, sequence_value) VALUES('cluster_version_id_seq', 0)", false);
     dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, sequence_value) VALUES('host_version_id_seq', 0)", false);
+
+
+
+    // upgrade tables
+    columns = new ArrayList<DBColumnInfo>();
+    columns.add(new DBAccessor.DBColumnInfo("upgrade_id", Long.class, null, null, false));
+    columns.add(new DBAccessor.DBColumnInfo("cluster_id", Long.class, null, null, false));
+    columns.add(new DBAccessor.DBColumnInfo("state", String.class, 255, UpgradeState.NONE.name(), false));
+    dbAccessor.createTable("upgrade", columns, "upgrade_id");
+    dbAccessor.addFKConstraint("upgrade", "fk_upgrade_cluster_id", "cluster_id", "clusters", "cluster_id", false);
+    dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, sequence_value) VALUES('upgrade_id_seq', 0)", false);
+
+    columns = new ArrayList<DBColumnInfo>();
+    columns.add(new DBAccessor.DBColumnInfo("upgrade_item_id", Long.class, null, null, false));
+    columns.add(new DBAccessor.DBColumnInfo("upgrade_id", Long.class, null, null, false));
+    columns.add(new DBAccessor.DBColumnInfo("state", String.class, 255, UpgradeState.NONE.name(), false));
+    columns.add(new DBAccessor.DBColumnInfo("hosts", char[].class, 32672, null, true));
+    columns.add(new DBAccessor.DBColumnInfo("tasks", char[].class, 32672, null, true));
+    columns.add(new DBAccessor.DBColumnInfo("item_text", String.class, 1024, null, true));
+    dbAccessor.createTable("upgrade_item", columns, "upgrade_item_id");
+    dbAccessor.addFKConstraint("upgrade", "fk_upgrade_item_upgrade_id", "upgrade_id", "upgrade", "upgrade_id", false);
+    dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, sequence_value) VALUES('upgrade_item_id_seq', 0)", false);
+
+
   }
 
   // ----- UpgradeCatalog ----------------------------------------------------

+ 22 - 0
ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql

@@ -686,6 +686,26 @@ CREATE INDEX idx_alert_history_state on alert_history(alert_state);
 CREATE INDEX idx_alert_group_name on alert_group(group_name);
 CREATE INDEX idx_alert_notice_state on alert_notice(notify_state);
 
+-- upgrade tables
+CREATE TABLE upgrade (
+  upgrade_id BIGINT NOT NULL,
+  cluster_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  PRIMARY KEY (upgrade_id),
+  FOREIGN KEY (cluster_id) REFERENCES clusters(cluster_id)
+);
+
+CREATE TABLE upgrade_item (
+  upgrade_item_id BIGINT NOT NULL,
+  upgrade_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  hosts TEXT,
+  tasks TEXT,
+  item_text VARCHAR(1024),
+  PRIMARY KEY (upgrade_item_id),
+  FOREIGN KEY (upgrade_id) REFERENCES upgrade(upgrade_id)
+);
+
 -- In order for the first ID to be 1, must initialize the ambari_sequences table with a sequence_value of 0.
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('cluster_id_seq', 1);
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('host_role_command_id_seq', 1);
@@ -715,6 +735,8 @@ INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('alert_histo
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('alert_notice_id_seq', 0);
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('alert_current_id_seq', 0);
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('repo_version_id_seq', 0);
+INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('upgrade_id_seq', 0);
+INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('upgrade_item_id_seq', 0);
 
 insert into adminresourcetype (resource_type_id, resource_type_name)
   select 1, 'AMBARI'

+ 23 - 0
ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql

@@ -676,6 +676,27 @@ CREATE INDEX idx_alert_history_state on alert_history(alert_state);
 CREATE INDEX idx_alert_group_name on alert_group(group_name);
 CREATE INDEX idx_alert_notice_state on alert_notice(notify_state);
 
+-- upgrade tables
+CREATE TABLE upgrade (
+  upgrade_id BIGINT NOT NULL,
+  cluster_id BIGINT NOT NULL,
+  state VARCHAR2(255) DEFAULT 'NONE' NOT NULL,
+  PRIMARY KEY (upgrade_id),
+  FOREIGN KEY (cluster_id) REFERENCES clusters(cluster_id)
+);
+
+CREATE TABLE upgrade_item (
+  upgrade_item_id BIGINT NOT NULL,
+  upgrade_id BIGINT NOT NULL,
+  state VARCHAR2(255) DEFAULT 'NONE' NOT NULL,
+  hosts CLOB,
+  tasks CLOB,
+  item_text VARCHAR2(1024),
+  PRIMARY KEY (upgrade_item_id),
+  FOREIGN KEY (upgrade_id) REFERENCES upgrade(upgrade_id)
+);
+
+
 ---------inserting some data-----------
 -- In order for the first ID to be 1, must initialize the ambari_sequences table with a sequence_value of 0.
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('host_role_command_id_seq', 0);
@@ -706,6 +727,8 @@ INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('alert_histo
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('alert_notice_id_seq', 0);
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('alert_current_id_seq', 0);
 INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('repo_version_id_seq', 0);
+INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('upgrade_id_seq', 0);
+INSERT INTO ambari_sequences(sequence_name, sequence_value) values ('upgrade_item_id_seq', 0);
 
 INSERT INTO metainfo("metainfo_key", "metainfo_value") values ('version', '${ambariVersion}');
 

+ 25 - 1
ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql

@@ -673,6 +673,26 @@ CREATE INDEX idx_alert_history_state on alert_history(alert_state);
 CREATE INDEX idx_alert_group_name on alert_group(group_name);
 CREATE INDEX idx_alert_notice_state on alert_notice(notify_state);
 
+-- upgrade tables
+CREATE TABLE ambari.upgrade (
+  upgrade_id BIGINT NOT NULL,
+  cluster_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  PRIMARY KEY (upgrade_id),
+  FOREIGN KEY (cluster_id) REFERENCES ambari.clusters(cluster_id)
+);
+
+CREATE TABLE ambari.upgrade_item (
+  upgrade_item_id BIGINT NOT NULL,
+  upgrade_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  hosts TEXT,
+  tasks TEXT,
+  item_text VARCHAR(1024),
+  PRIMARY KEY (upgrade_item_id),
+  FOREIGN KEY (upgrade_id) REFERENCES ambari.upgrade(upgrade_id)
+);
+
 ---------inserting some data-----------
 -- In order for the first ID to be 1, must initialize the ambari_sequences table with a sequence_value of 0.
 BEGIN;
@@ -731,7 +751,11 @@ BEGIN;
   union all
   select 'host_version_id_seq', 0
   union all
-  select 'service_config_id_seq', 1;
+  select 'service_config_id_seq', 1
+  union all
+  select 'upgrade_id_seq', 0 
+  union all
+  select 'upgrade_item_id_seq', 0;
 
   INSERT INTO adminresourcetype (resource_type_id, resource_type_name)
   SELECT 1, 'AMBARI'

+ 28 - 1
ambari-server/src/main/resources/Ambari-DDL-Postgres-EMBEDDED-CREATE.sql

@@ -755,6 +755,29 @@ CREATE INDEX idx_alert_history_state on ambari.alert_history(alert_state);
 CREATE INDEX idx_alert_group_name on ambari.alert_group(group_name);
 CREATE INDEX idx_alert_notice_state on ambari.alert_notice(notify_state);
 
+-- upgrade tables
+CREATE TABLE ambari.upgrade (
+  upgrade_id BIGINT NOT NULL,
+  cluster_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  PRIMARY KEY (upgrade_id),
+  FOREIGN KEY (cluster_id) REFERENCES ambari.clusters(cluster_id)
+);
+
+CREATE TABLE ambari.upgrade_item (
+  upgrade_item_id BIGINT NOT NULL,
+  upgrade_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  hosts TEXT,
+  tasks TEXT,
+  item_text VARCHAR(1024),
+  PRIMARY KEY (upgrade_item_id),
+  FOREIGN KEY (upgrade_id) REFERENCES ambari.upgrade(upgrade_id)
+);
+
+GRANT ALL PRIVILEGES ON TABLE ambari.upgrade TO :username;
+GRANT ALL PRIVILEGES ON TABLE ambari.upgrade_item TO :username;
+
 ---------inserting some data-----------
 -- In order for the first ID to be 1, must initialize the ambari_sequences table with a sequence_value of 0.
 BEGIN;
@@ -813,7 +836,11 @@ INSERT INTO ambari.ambari_sequences (sequence_name, sequence_value)
   union all
   select 'host_version_id_seq', 0
   union all
-  select 'service_config_id_seq', 1;
+  select 'service_config_id_seq', 1
+  union all
+  select 'upgrade_id_seq', 0 
+  union all
+  select 'upgrade_item_id_seq', 0;
 
 INSERT INTO ambari.adminresourcetype (resource_type_id, resource_type_name)
   SELECT 1, 'AMBARI'

+ 25 - 1
ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql

@@ -256,6 +256,26 @@ CREATE INDEX idx_alert_history_state on alert_history(alert_state);
 CREATE INDEX idx_alert_group_name on alert_group(group_name);
 CREATE INDEX idx_alert_notice_state on alert_notice(notify_state);
 
+-- upgrade tables
+CREATE TABLE upgrade (
+  upgrade_id BIGINT NOT NULL,
+  cluster_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  PRIMARY KEY (upgrade_id),
+  FOREIGN KEY (cluster_id) REFERENCES ambari.clusters(cluster_id)
+);
+
+CREATE TABLE upgrade_item (
+  upgrade_item_id BIGINT NOT NULL,
+  upgrade_id BIGINT NOT NULL,
+  state VARCHAR(255) DEFAULT 'NONE' NOT NULL,
+  hosts TEXT,
+  tasks TEXT,
+  item_text VARCHAR(1024),
+  PRIMARY KEY (upgrade_item_id),
+  FOREIGN KEY (upgrade_id) REFERENCES upgrade(upgrade_id)
+);
+
 ---------inserting some data-----------
 BEGIN TRANSACTION
   INSERT INTO ambari_sequences (sequence_name, [sequence_value])
@@ -307,7 +327,11 @@ BEGIN TRANSACTION
   UNION ALL
   SELECT 'alert_notice_id_seq', 0
   UNION ALL
-  SELECT 'alert_current_id_seq', 0;
+  SELECT 'alert_current_id_seq', 0
+  UNION ALL
+  SELECT 'upgrade_id_seq', 0
+  UNION ALL
+  SELECT 'upgrade_item_id_seq', 0;
 
   insert into adminresourcetype (resource_type_id, resource_type_name)
     select 1, 'AMBARI'

+ 6 - 0
ambari-server/src/main/resources/Ambari-DDL-SQLServer-DROP.sql

@@ -201,3 +201,9 @@ IF OBJECT_ID('adminprincipaltype', 'U') IS NOT NULL DROP TABLE adminprincipaltyp
 GO
 IF OBJECT_ID('adminresourcetype', 'U') IS NOT NULL DROP TABLE adminresourcetype
 GO
+
+IF OBJECT_ID('upgrade_item', 'U') IS NOT NULL DROP TABLE upgrade_item
+GO
+IF OBJECT_ID('upgrade', 'U') IS NOT NULL DROP TABLE upgrade
+GO
+

+ 2 - 0
ambari-server/src/main/resources/META-INF/persistence.xml

@@ -64,6 +64,8 @@
     <class>org.apache.ambari.server.orm.entities.ServiceConfigEntity</class>
     <class>org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity</class>
     <class>org.apache.ambari.server.orm.entities.StageEntity</class>
+    <class>org.apache.ambari.server.orm.entities.UpgradeEntity</class>
+    <class>org.apache.ambari.server.orm.entities.UpgradeItemEntity</class>
     <class>org.apache.ambari.server.orm.entities.UserEntity</class>
     <class>org.apache.ambari.server.orm.entities.ViewEntity</class>
     <class>org.apache.ambari.server.orm.entities.ViewEntityEntity</class>

+ 22 - 8
ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml

@@ -16,6 +16,7 @@
    limitations under the License.
 -->
 <upgrade xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <target>2.2.*.*</target>
   <order>
     <service name="ZOOKEEPER">
       <component>ZOOKEEPER_SERVER</component>
@@ -28,20 +29,33 @@
     </service>
   </order>
   <processing>
+    <service name="ZOOKEEPER">
+      <component name="ZOOKEEPER_SERVER">
+        <batch xsi:type="count">
+          <count>1</count>
+        </batch>
+      </component>
+    </service>
     <service name="HDFS">
       <component name="NAMENODE">
-        <upgrade>
+        <pre-upgrade>
           <task xsi:type="execute">
-            <command>su - {hdfs-user} -c 'hdp-select hadoop-hdfs-namenode {version}'</command>
+            <command>su - {hdfs-user} -c 'hdfs dfsadmin -rollingUpgrade prepare'</command>
           </task>
-        </upgrade>
+          <task xsi:type="execute">
+            <command>su - {hdfs-user} -c 'hdfs dfsadmin -rollingUpgrade query'</command>
+            <until>Proceed with rolling upgrade</until>
+          </task>
+        </pre-upgrade>
+        <post-upgrade>
+          <task xsi:type="execute">
+            <command>su - {hdfs-user} -c 'hdfs dfsadmin -rollingUpgrade finalize'</command>
+          </task>
+        </post-upgrade>
       </component>
-    </service>
-    <service name="HDFS">
       <component name="DATANODE">
-        <batch xsi:type="conditional">
-          <initial>10</initial>
-          <remaining>25</remaining>
+        <batch xsi:type="percent">
+          <percent>20</percent>
         </batch>
         <upgrade>
           <task xsi:type="execute">

+ 182 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java

@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
+import org.apache.ambari.server.orm.dao.UpgradeDAO;
+import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
+import org.apache.ambari.server.orm.entities.UpgradeEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.StackId;
+import org.easymock.Capture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.persist.PersistService;
+import com.google.inject.util.Modules;
+
+/**
+ * UpgradeResourceDefinition tests.
+ */
+public class UpgradeResourceProviderTest {
+
+  private UpgradeDAO upgradeDao = null;
+  private RepositoryVersionDAO repoVersionDao = null;
+  private Injector m_injector;
+
+  @Before
+  public void before() {
+    upgradeDao = createStrictMock(UpgradeDAO.class);
+    repoVersionDao = createStrictMock(RepositoryVersionDAO.class);
+
+    m_injector = Guice.createInjector(Modules.override(
+        new InMemoryDefaultTestModule()).with(new Module() {
+
+          @Override
+          public void configure(Binder binder) {
+            // TODO Auto-generated method stub
+            binder.bind(UpgradeDAO.class).toInstance(upgradeDao);
+            binder.bind(RepositoryVersionDAO.class).toInstance(repoVersionDao);
+          }
+        }));
+    m_injector.getInstance(GuiceJpaInitializer.class);
+  }
+
+  @After
+  public void after() {
+    m_injector.getInstance(PersistService.class).stop();
+    m_injector = null;
+  }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testCreateResources() throws Exception {
+    AmbariManagementController amc = createMock(AmbariManagementController.class);
+    Clusters clusters = createMock(Clusters.class);
+    Cluster cluster = createCluster();
+
+    expect(amc.getClusters()).andReturn(clusters).atLeastOnce();
+    expect(clusters.getCluster((String) anyObject())).andReturn(cluster).atLeastOnce();
+
+    RepositoryVersionEntity repoVersionEntity = new RepositoryVersionEntity();
+    repoVersionEntity.setUpgradePackage("upgrade_test");
+
+    expect(repoVersionDao.findByStackAndVersion((String)anyObject(),(String)anyObject())).andReturn(repoVersionEntity).atLeastOnce();
+
+    Capture<UpgradeEntity> entityCapture = new Capture<UpgradeEntity>();
+    upgradeDao.create(capture(entityCapture));
+    expectLastCall();
+
+    replay(amc, clusters, cluster, upgradeDao, repoVersionDao);
+
+    UpgradeResourceProvider provider = createProvider(amc);
+
+    Map<String, Object> requestProps = new HashMap<String, Object>();
+    requestProps.put(UpgradeResourceProvider.UPGRADE_CLUSTER_NAME, "c1");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_VERSION, "2.2.2.2");
+
+    Request request = PropertyHelper.getCreateRequest(Collections.singleton(requestProps), null);
+    provider.createResources(request);
+
+    assertTrue(entityCapture.hasCaptured());
+    UpgradeEntity entity = entityCapture.getValue();
+    assertNotNull(entity);
+    assertEquals(Long.valueOf(1), entity.getClusterId());
+    assertEquals(3, entity.getUpgradeItems().size());
+
+    assertTrue(entity.getUpgradeItems().get(0).getText().contains("Preparing"));
+    assertTrue(entity.getUpgradeItems().get(1).getText().contains("Restarting"));
+    assertTrue(entity.getUpgradeItems().get(2).getText().contains("Finalizing"));
+
+    verify(amc, clusters, cluster, upgradeDao);
+  }
+
+
+  /**
+   * @param amc
+   * @return the provider
+   */
+  private UpgradeResourceProvider createProvider(AmbariManagementController amc) {
+    return new UpgradeResourceProvider(amc);
+  }
+
+  private Cluster createCluster() {
+    Cluster cluster = createMock(Cluster.class);
+
+    expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).anyTimes();
+    expect(cluster.getDesiredStackVersion()).andReturn(new StackId("HDP-2.1.1")).atLeastOnce();
+
+    final ServiceComponentHost zk_server_host_comp = createStrictMock(ServiceComponentHost.class);
+    Map<String, ServiceComponentHost> zk_host_comp_map = new HashMap<String, ServiceComponentHost>() {{
+      put("h1", zk_server_host_comp);
+    }};
+
+    final ServiceComponent zk_server_comp = createStrictMock(ServiceComponent.class);
+    expect(zk_server_comp.getServiceComponentHosts()).andReturn(zk_host_comp_map).atLeastOnce();
+
+    Map<String, ServiceComponent> zk_comp_map = new HashMap<String, ServiceComponent>() {{
+      put("ZOOKEEPER_SERVER", zk_server_comp);
+    }};
+
+    final Service zk = createStrictMock(Service.class);
+    expect(zk.getServiceComponents()).andReturn(zk_comp_map).atLeastOnce();
+
+    Map<String, Service> servicesMap = new HashMap<String, Service>() {{
+      put("ZOOKEEPER", zk);
+    }};
+    expect(cluster.getServices()).andReturn(servicesMap).anyTimes();
+
+    replay(zk_server_host_comp, zk_server_comp, zk);
+
+
+    return cluster;
+  }
+
+}

+ 5 - 20
ambari-server/src/test/java/org/apache/ambari/server/orm/dao/UpgradeDAOTest.java

@@ -20,34 +20,18 @@ package org.apache.ambari.server.orm.dao;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
-import java.util.UUID;
 
-import org.apache.ambari.server.controller.RootServiceResponseFactory;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
-import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
-import org.apache.ambari.server.orm.entities.AlertGroupEntity;
-import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
-import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
-import org.apache.ambari.server.orm.entities.ClusterEntity;
 import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
-import org.apache.ambari.server.state.AlertState;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.NotificationState;
 import org.apache.ambari.server.state.UpgradeState;
-import org.apache.ambari.server.state.alert.Scope;
-import org.apache.ambari.server.state.alert.SourceType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -84,21 +68,22 @@ public class UpgradeDAOTest {
 
     // create upgrade entities
     UpgradeEntity entity = new UpgradeEntity();
-    dao.create(entity);
+    entity.setClusterId(Long.valueOf(1));
 
     // create 2 items
     List<UpgradeItemEntity> items = new ArrayList<UpgradeItemEntity>();
     UpgradeItemEntity item = new UpgradeItemEntity();
-    item.setId(Long.valueOf(entity.getId().longValue() + 1000));
     item.setState(UpgradeState.IN_PROGRESS);
+    item.setUpgradeEntity(entity);
     items.add(item);
 
     item = new UpgradeItemEntity();
-    item.setId(Long.valueOf(entity.getId().longValue() + 1001));
     item.setState(UpgradeState.PENDING);
+    item.setUpgradeEntity(entity);
     items.add(item);
 
     entity.setUpgradeItems(items);
+    dao.create(entity);
   }
 
   @After
@@ -115,7 +100,7 @@ public class UpgradeDAOTest {
   }
 
   @Test
-  public void test() throws Exception {
+  public void testFindUpgrade() throws Exception {
     List<UpgradeEntity> items = dao.findUpgrades(clusterId.longValue());
     assertTrue(items.size() > 0);
 

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/stack/UpgradePackTest.java

@@ -147,7 +147,7 @@ public class UpgradePackTest {
     assertEquals(Task.Type.CONFIGURE, pc.tasks.get(1).getType());
     assertEquals(ConfigureTask.class, pc.tasks.get(1).getClass());
     assertEquals("hdfs-site",
-        ConfigureTask.class.cast(pc.tasks.get(1)).config);
+        ConfigureTask.class.cast(pc.tasks.get(1)).configType);
     assertEquals("myproperty",
         ConfigureTask.class.cast(pc.tasks.get(1)).key);
     assertEquals("mynewvalue",

+ 12 - 0
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java

@@ -95,6 +95,9 @@ public class UpgradeCatalog200Test {
     Capture<DBAccessor.DBColumnInfo> valueColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
     Capture<DBAccessor.DBColumnInfo> dataValueColumnCapture = new Capture<DBAccessor.DBColumnInfo>();
 
+    Capture<List<DBAccessor.DBColumnInfo>> upgradeCapture = new Capture<List<DBAccessor.DBColumnInfo>>();
+    Capture<List<DBAccessor.DBColumnInfo>> upgradeItemCapture = new Capture<List<DBAccessor.DBColumnInfo>>();
+
     // Alert Definition
     dbAccessor.addColumn(eq("alert_definition"),
         capture(alertDefinitionIgnoreColumnCapture));
@@ -111,6 +114,12 @@ public class UpgradeCatalog200Test {
     dbAccessor.createTable(eq("host_version"),
         capture(hostVersionCapture), eq("id"));
 
+    // Upgrade
+    dbAccessor.createTable(eq("upgrade"), capture(upgradeCapture), eq("upgrade_id"));
+    // Upgrade item
+    dbAccessor.createTable(eq("upgrade_item"), capture(upgradeItemCapture), eq("upgrade_item_id"));
+
+
     setViewInstancePropertyExpectations(dbAccessor, valueColumnCapture);
     setViewInstanceDataExpectations(dbAccessor, dataValueColumnCapture);
 
@@ -142,6 +151,9 @@ public class UpgradeCatalog200Test {
 
     assertViewInstancePropertyColumns(valueColumnCapture);
     assertViewInstanceDataColumns(dataValueColumnCapture);
+
+    assertEquals(3, upgradeCapture.getValue().size());
+    assertEquals(6, upgradeItemCapture.getValue().size());
   }
 
   @Test