Ver código fonte

AMBARI-9761 - Performance: Cluster Installation Deadlocks When Setting Component States (jonathanhurley)

Jonathan Hurley 10 anos atrás
pai
commit
00ae60adb2

+ 13 - 14
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java

@@ -17,26 +17,28 @@
  */
 package org.apache.ambari.server.events.listeners.upgrade;
 
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.HostComponentVersionEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 /**
- * The {@link StackVersionListener} class handles the propagation of versions advertised by the {@link org.apache.ambari.server.state.ServiceComponentHost}
- * that bubble up to the {@link org.apache.ambari.server.orm.entities.HostVersionEntity} and eventually the
+ * The {@link StackVersionListener} class handles the propagation of versions
+ * advertised by the {@link org.apache.ambari.server.state.ServiceComponentHost}
+ * that bubble up to the
+ * {@link org.apache.ambari.server.orm.entities.HostVersionEntity} and
+ * eventually the
  * {@link org.apache.ambari.server.orm.entities.ClusterVersionEntity}
  */
 @Singleton
@@ -47,8 +49,6 @@ public class StackVersionListener {
    */
   private final static Logger LOG = LoggerFactory.getLogger(StackVersionListener.class);
 
-  private AmbariEventPublisher ambariEventPublisher;
-
   /**
    * Used to prevent multiple threads from trying to create host alerts
    * simultaneously.
@@ -62,7 +62,6 @@ public class StackVersionListener {
    */
   @Inject
   public StackVersionListener(AmbariEventPublisher ambariEventPublisher) {
-    this.ambariEventPublisher = ambariEventPublisher;
     ambariEventPublisher.register(this);
   }
 
@@ -71,12 +70,12 @@ public class StackVersionListener {
   public void onAmbariEvent(HostComponentVersionEvent event) {
     LOG.debug("Received event {}", event);
 
+    Cluster cluster = event.getCluster();
     ServiceComponentHost sch = event.getServiceComponentHost();
 
     m_stackVersionLock.lock();
 
     try {
-      Cluster cluster = event.getCluster();
       String repoVersion = sch.recalculateHostVersionState();
       cluster.recalculateClusterVersionState(repoVersion);
     } catch (Exception e) {

+ 7 - 11
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java

@@ -412,19 +412,15 @@ public class ServiceComponentImpl implements ServiceComponent {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public boolean isPersisted() {
-    clusterGlobalLock.readLock().lock();
-    try {
-      readWriteLock.readLock().lock();
-      try {
-        return persisted;
-      } finally {
-        readWriteLock.readLock().unlock();
-      }
-    } finally {
-      clusterGlobalLock.readLock().unlock();
-    }
+    // a lock around this internal state variable is not required since we
+    // have appropriate locks in the persist() method and this member is
+    // only ever false under the condition that the object is new
+    return persisted;
   }
 
   @Override

+ 7 - 11
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java

@@ -408,19 +408,15 @@ public class ServiceImpl implements Service {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public boolean isPersisted() {
-    clusterGlobalLock.readLock().lock();
-    try {
-      readWriteLock.readLock().lock();
-      try {
-        return persisted;
-      } finally {
-        readWriteLock.readLock().unlock();
-      }
-    } finally {
-      clusterGlobalLock.readLock().unlock();
-    }
+    // a lock around this internal state variable is not required since we
+    // have appropriate locks in the persist() method and this member is
+    // only ever false under the condition that the object is new
+    return persisted;
   }
 
   @Override

+ 15 - 32
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java

@@ -19,7 +19,6 @@
 package org.apache.ambari.server.state.svccomphost;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -78,7 +77,6 @@ import org.apache.ambari.server.state.fsm.SingleArcTransition;
 import org.apache.ambari.server.state.fsm.StateMachine;
 import org.apache.ambari.server.state.fsm.StateMachineFactory;
 import org.apache.ambari.server.state.stack.upgrade.RepositoryVersionHelper;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -756,30 +754,20 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
 
   @Override
   public State getState() {
-    clusterGlobalLock.readLock().lock();
-    try {
-      // there's no reason to lock around the state machine for this SCH since
-      // the state machine is synchronized
-      return stateMachine.getCurrentState();
-    } finally {
-      clusterGlobalLock.readLock().unlock();
-    }
+    // there's no reason to lock around the state machine for this SCH since
+    // the state machine is synchronized
+    return stateMachine.getCurrentState();
   }
 
   @Override
   public void setState(State state) {
-    clusterGlobalLock.readLock().lock();
+    writeLock.lock();
     try {
-      writeLock.lock();
-      try {
-        stateMachine.setCurrentState(state);
-        stateEntity.setCurrentState(state);
-        saveIfPersisted();
-      } finally {
-        writeLock.unlock();
-      }
+      stateMachine.setCurrentState(state);
+      stateEntity.setCurrentState(state);
+      saveIfPersisted();
     } finally {
-      clusterGlobalLock.readLock().unlock();
+      writeLock.unlock();
     }
   }
 
@@ -1148,20 +1136,15 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public boolean isPersisted() {
-    clusterGlobalLock.readLock().lock();
-    try {
-      readLock.lock();
-      try {
-        return persisted;
-      } finally {
-        readLock.unlock();
-      }
-    } finally {
-      clusterGlobalLock.readLock().unlock();
-    }
-
+    // a lock around this internal state variable is not required since we
+    // have appropriate locks in the persist() method and this member is
+    // only ever false under the condition that the object is new
+    return persisted;
   }
 
   @Override

+ 254 - 99
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java

@@ -43,6 +43,7 @@ import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostFactory;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,10 +55,13 @@ import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 
 /**
- * Tests AMBARI-9368 which produced a deadlock during read and writes of some of
- * the impl classes.
+ * Tests AMBARI-9368 and AMBARI-9761 which produced a deadlock during read and
+ * writes of some of the impl classes.
  */
 public class ClusterDeadlockTest {
+  private static final int NUMBER_OF_HOSTS = 100;
+  private static final int NUMBER_OF_THREADS = 3;
+
   private final AtomicInteger hostNameCounter = new AtomicInteger(0);
 
   @Inject
@@ -81,32 +85,51 @@ public class ClusterDeadlockTest {
   @Inject
   private OrmTestHelper helper;
 
+  private StackId stackId = new StackId("HDP-0.1");
+
+  /**
+   * The cluster.
+   */
+  private Cluster cluster;
+
+  /**
+   *
+   */
+  private List<String> hostNames = new ArrayList<String>(NUMBER_OF_HOSTS);
+
+  /**
+   * Creates 100 hosts and adds them to the cluster.
+   *
+   * @throws Exception
+   */
   @Before
   public void setup() throws Exception {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
     injector.injectMembers(this);
     clusters.addCluster("c1");
-
-    StackId stackId = new StackId("HDP-0.1");
-    Cluster c1 = clusters.getCluster("c1");
-    c1.setDesiredStackVersion(stackId);
+    cluster = clusters.getCluster("c1");
+    cluster.setDesiredStackVersion(stackId);
     helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion());
-    c1.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
+    cluster.createClusterVersion(stackId.getStackName(),
+        stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
+
     metaInfo.init();
 
     // 100 hosts
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < NUMBER_OF_HOSTS; i++) {
       String hostName = "c64-" + i;
+      hostNames.add(hostName);
+
       clusters.addHost(hostName);
       setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
       clusters.getHost(hostName).persist();
       clusters.mapHostToCluster(hostName, "c1");
     }
 
-    // force creation of the service and the components on the last host
-    createNewServiceComponentHost("HDFS", "NAMENODE", "c64-99", false);
-    createNewServiceComponentHost("HDFS", "HDFS_CLIENT", "c64-99", true);
+    Service service = installService("HDFS");
+    addServiceComponent(service, "NAMENODE");
+    addServiceComponent(service, "DATANODE");
   }
 
   @After
@@ -122,26 +145,25 @@ public class ClusterDeadlockTest {
    */
   @Test(timeout = 30000)
   public void testDeadlockBetweenImplementations() throws Exception {
-    Cluster cluster = clusters.getCluster("c1");
     Service service = cluster.getService("HDFS");
-    ServiceComponent namenodeComponent = service.getServiceComponent("NAMENODE");
-    ServiceComponent hdfsClientComponent = service.getServiceComponent("HDFS_CLIENT");
+    ServiceComponent nameNodeComponent = service.getServiceComponent("NAMENODE");
+    ServiceComponent dataNodeComponent = service.getServiceComponent("DATANODE");
 
-    ServiceComponentHost namenodeSCH = createNewServiceComponentHost("HDFS",
-        "NAMENODE", "c64-0", false);
+    ServiceComponentHost nameNodeSCH = createNewServiceComponentHost("HDFS",
+        "NAMENODE", "c64-0");
 
-    ServiceComponentHost hdfsClientSCH = createNewServiceComponentHost("HDFS",
-        "HDFS_CLIENT", "c64-0", true);
+    ServiceComponentHost dataNodeSCH = createNewServiceComponentHost("HDFS",
+        "DATANODE", "c64-0");
 
     List<Thread> threads = new ArrayList<Thread>();
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
       DeadlockExerciserThread thread = new DeadlockExerciserThread();
       thread.setCluster(cluster);
       thread.setService(service);
-      thread.setHdfsClientComponent(hdfsClientComponent);
-      thread.setNamenodeComponent(namenodeComponent);
-      thread.setNamenodeSCH(namenodeSCH);
-      thread.setHdfsClientSCH(hdfsClientSCH);
+      thread.setDataNodeComponent(dataNodeComponent);
+      thread.setNameNodeComponent(nameNodeComponent);
+      thread.setNameNodeSCH(nameNodeSCH);
+      thread.setDataNodeSCH(dataNodeSCH);
       thread.start();
       threads.add(thread);
     }
@@ -157,18 +179,17 @@ public class ClusterDeadlockTest {
    *
    * @throws Exception
    */
-  @Test(timeout = 30000)
+  @Test(timeout = 35000)
   public void testAddingHostComponentsWhileReading() throws Exception {
-    Cluster cluster = clusters.getCluster("c1");
     Service service = cluster.getService("HDFS");
-    ServiceComponent namenodeComponent = service.getServiceComponent("NAMENODE");
-    ServiceComponent hdfsClientComponent = service.getServiceComponent("HDFS_CLIENT");
+    ServiceComponent nameNodeComponent = service.getServiceComponent("NAMENODE");
+    ServiceComponent dataNodeComponent = service.getServiceComponent("DATANODE");
 
     List<Thread> threads = new ArrayList<Thread>();
     for (int i = 0; i < 5; i++) {
-      ServiceComponentDeadlockThread thread = new ServiceComponentDeadlockThread();
-      thread.setHdfsClientComponent(hdfsClientComponent);
-      thread.setNamenodeComponent(namenodeComponent);
+      ServiceComponentReaderWriterThread thread = new ServiceComponentReaderWriterThread();
+      thread.setDataNodeComponent(dataNodeComponent);
+      thread.setNameNodeComponent(nameNodeComponent);
       thread.start();
       threads.add(thread);
     }
@@ -179,27 +200,149 @@ public class ClusterDeadlockTest {
   }
 
   /**
-   * Tests AMBARI-9368 which saw a deadlock when adding a service component host
-   * while reading a service component.
+   * Tests that no deadlock exists while restarting components and reading from
+   * the cluster.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testDeadlockWhileRestartingComponents() throws Exception {
+    // for each host, install both components
+    List<ServiceComponentHost> serviceComponentHosts = new ArrayList<ServiceComponentHost>();
+    for (String hostName : hostNames) {
+      serviceComponentHosts.add(createNewServiceComponentHost("HDFS",
+          "NAMENODE", hostName));
+
+      serviceComponentHosts.add(createNewServiceComponentHost("HDFS",
+          "DATANODE", hostName));
+    }
+
+    // !!! needed to populate some maps; without this, the cluster report
+    // won't do anything and this test will be worthless
+    ((ClusterImpl) cluster).loadServiceHostComponents();
+
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < NUMBER_OF_THREADS; i++) {
+      ClusterReaderThread clusterReaderThread = new ClusterReaderThread();
+      ClusterWriterThread clusterWriterThread = new ClusterWriterThread();
+      ServiceComponentRestartThread schWriterThread = new ServiceComponentRestartThread(
+          serviceComponentHosts);
+
+      threads.add(clusterReaderThread);
+      threads.add(clusterWriterThread);
+      threads.add(schWriterThread);
+
+      clusterReaderThread.start();
+      clusterWriterThread.start();
+      schWriterThread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+  }
+
+  /**
+   * The {@link ClusterReaderThread} reads from a cluster over and over again
+   * with a slight pause.
+   */
+  private final class ClusterReaderThread extends Thread {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < 1500; i++) {
+          cluster.convertToResponse();
+          Thread.sleep(10);
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+  /**
+   * The {@link ClusterWriterThread} writes some information to the cluster
+   * instance over and over again with a slight pause.
+   */
+  private final class ClusterWriterThread extends Thread {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < 1500; i++) {
+          cluster.setDesiredStackVersion(stackId);
+          Thread.sleep(10);
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+  /**
+   * The {@link ServiceComponentRestartThread} is used to constantly set SCH
+   * restart values.
+   */
+  private final class ServiceComponentRestartThread extends Thread {
+    private List<ServiceComponentHost> serviceComponentHosts;
+
+    /**
+     * Constructor.
+     *
+     * @param serviceComponentHosts
+     */
+    private ServiceComponentRestartThread(
+        List<ServiceComponentHost> serviceComponentHosts) {
+      this.serviceComponentHosts = serviceComponentHosts;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void run() {
+      try {
+        for (int i = 0; i < 1000; i++) {
+          // about 30ms to go through all SCHs, no sleep needed
+          for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
+            serviceComponentHost.setRestartRequired(true);
+          }
+        }
+      } catch (Exception exception) {
+        throw new RuntimeException(exception);
+      }
+    }
+  }
+
+  /**
+   * The {@link ServiceComponentRestartThread} is used to continuously add
+   * components to hosts while reading from those components.
    */
-  private final class ServiceComponentDeadlockThread extends Thread {
-    private ServiceComponent namenodeComponent;
-    private ServiceComponent hdfsClientComponent;
+  private final class ServiceComponentReaderWriterThread extends Thread {
+    private ServiceComponent nameNodeComponent;
+    private ServiceComponent dataNodeComponent;
 
     /**
-     * @param namenodeComponent
-     *          the namenodeComponent to set
+     * @param nameNodeComponent
+     *          the nameNodeComponent to set
      */
-    public void setNamenodeComponent(ServiceComponent namenodeComponent) {
-      this.namenodeComponent = namenodeComponent;
+    public void setNameNodeComponent(ServiceComponent nameNodeComponent) {
+      this.nameNodeComponent = nameNodeComponent;
     }
 
     /**
-     * @param hdfsClientComponent
-     *          the hdfsClientComponent to set
+     * @param dataNodeComponent
+     *          the dataNodeComponent to set
      */
-    public void setHdfsClientComponent(ServiceComponent hdfsClientComponent) {
-      this.hdfsClientComponent = hdfsClientComponent;
+    public void setDataNodeComponent(ServiceComponent dataNodeComponent) {
+      this.dataNodeComponent = dataNodeComponent;
     }
 
     /**
@@ -211,13 +354,13 @@ public class ClusterDeadlockTest {
         for (int i = 0; i < 15; i++) {
           int hostNumeric = hostNameCounter.getAndIncrement();
 
-          namenodeComponent.convertToResponse();
+          nameNodeComponent.convertToResponse();
           createNewServiceComponentHost("HDFS", "NAMENODE", "c64-"
-              + hostNumeric, false);
+              + hostNumeric);
 
-          hdfsClientComponent.convertToResponse();
-          createNewServiceComponentHost("HDFS", "HDFS_CLIENT", "c64-"
-              + hostNumeric, true);
+          dataNodeComponent.convertToResponse();
+          createNewServiceComponentHost("HDFS", "DATANODE", "c64-"
+              + hostNumeric);
 
           Thread.sleep(10);
         }
@@ -234,10 +377,10 @@ public class ClusterDeadlockTest {
   private static final class DeadlockExerciserThread extends Thread {
     private Cluster cluster;
     private Service service;
-    private ServiceComponent namenodeComponent;
-    private ServiceComponent hdfsClientComponent;
-    private ServiceComponentHost namenodeSCH;
-    private ServiceComponentHost hdfsClientSCH;
+    private ServiceComponent nameNodeComponent;
+    private ServiceComponent dataNodeComponent;
+    private ServiceComponentHost nameNodeSCH;
+    private ServiceComponentHost dataNodeSCH;
 
     /**
      * @param cluster
@@ -256,35 +399,35 @@ public class ClusterDeadlockTest {
     }
 
     /**
-     * @param namenodeComponent
-     *          the namenodeComponent to set
+     * @param nameNodeComponent
+     *          the nameNodeComponent to set
      */
-    public void setNamenodeComponent(ServiceComponent namenodeComponent) {
-      this.namenodeComponent = namenodeComponent;
+    public void setNameNodeComponent(ServiceComponent nameNodeComponent) {
+      this.nameNodeComponent = nameNodeComponent;
     }
 
     /**
-     * @param hdfsClientComponent
-     *          the hdfsClientComponent to set
+     * @param dataNodeComponent
+     *          the dataNodeComponent to set
      */
-    public void setHdfsClientComponent(ServiceComponent hdfsClientComponent) {
-      this.hdfsClientComponent = hdfsClientComponent;
+    public void setDataNodeComponent(ServiceComponent dataNodeComponent) {
+      this.dataNodeComponent = dataNodeComponent;
     }
 
     /**
-     * @param namenodeSCH
-     *          the namenodeSCH to set
+     * @param nameNodeSCH
+     *          the nameNodeSCH to set
      */
-    public void setNamenodeSCH(ServiceComponentHost namenodeSCH) {
-      this.namenodeSCH = namenodeSCH;
+    public void setNameNodeSCH(ServiceComponentHost nameNodeSCH) {
+      this.nameNodeSCH = nameNodeSCH;
     }
 
     /**
-     * @param hdfsClientSCH
-     *          the hdfsClientSCH to set
+     * @param dataNodeSCH
+     *          the dataNodeSCH to set
      */
-    public void setHdfsClientSCH(ServiceComponentHost hdfsClientSCH) {
-      this.hdfsClientSCH = hdfsClientSCH;
+    public void setDataNodeSCH(ServiceComponentHost dataNodeSCH) {
+      this.dataNodeSCH = dataNodeSCH;
     }
 
     /**
@@ -296,18 +439,18 @@ public class ClusterDeadlockTest {
         for (int i = 0; i < 10; i++) {
           cluster.convertToResponse();
           service.convertToResponse();
-          namenodeComponent.convertToResponse();
-          hdfsClientComponent.convertToResponse();
-          namenodeSCH.convertToResponse();
-          hdfsClientSCH.convertToResponse();
+          nameNodeComponent.convertToResponse();
+          dataNodeComponent.convertToResponse();
+          nameNodeSCH.convertToResponse();
+          dataNodeSCH.convertToResponse();
 
           cluster.setProvisioningState(org.apache.ambari.server.state.State.INIT);
           service.setMaintenanceState(MaintenanceState.OFF);
-          namenodeComponent.setDesiredState(org.apache.ambari.server.state.State.STARTED);
-          hdfsClientComponent.setDesiredState(org.apache.ambari.server.state.State.INSTALLED);
+          nameNodeComponent.setDesiredState(org.apache.ambari.server.state.State.STARTED);
+          dataNodeComponent.setDesiredState(org.apache.ambari.server.state.State.INSTALLED);
 
-          namenodeSCH.setState(org.apache.ambari.server.state.State.STARTED);
-          hdfsClientSCH.setState(org.apache.ambari.server.state.State.INSTALLED);
+          nameNodeSCH.setState(org.apache.ambari.server.state.State.STARTED);
+          dataNodeSCH.setState(org.apache.ambari.server.state.State.INSTALLED);
 
           Thread.sleep(100);
         }
@@ -325,39 +468,51 @@ public class ClusterDeadlockTest {
   }
 
   private ServiceComponentHost createNewServiceComponentHost(String svc,
-      String svcComponent, String hostName, boolean isClient)
-      throws AmbariException {
-    Cluster c = clusters.getCluster("c1");
-    Assert.assertNotNull(c.getConfigGroups());
-    return createNewServiceComponentHost(c, svc, svcComponent, hostName);
-  }
+      String svcComponent, String hostName) throws AmbariException {
+    Assert.assertNotNull(cluster.getConfigGroups());
+    Service s = installService(svc);
+    ServiceComponent sc = addServiceComponent(s, svcComponent);
+
+    ServiceComponentHost sch = serviceComponentHostFactory.createNew(sc,
+        hostName);
 
-  private ServiceComponentHost createNewServiceComponentHost(Cluster c,
-      String svc, String svcComponent, String hostName) throws AmbariException {
+    sc.addServiceComponentHost(sch);
+    sch.setDesiredState(State.INSTALLED);
+    sch.setState(State.INSTALLED);
+    sch.setDesiredStackVersion(stackId);
+    sch.setStackVersion(stackId);
 
-    Service s = null;
+    sch.persist();
+    return sch;
+  }
+
+  private Service installService(String serviceName) throws AmbariException {
+    Service service = null;
 
     try {
-      s = c.getService(svc);
+      service = cluster.getService(serviceName);
     } catch (ServiceNotFoundException e) {
-      s = serviceFactory.createNew(c, svc);
-      c.addService(s);
-      s.persist();
+      service = serviceFactory.createNew(cluster, serviceName);
+      cluster.addService(service);
+      service.persist();
     }
 
-    ServiceComponent sc = null;
+    return service;
+  }
+
+  private ServiceComponent addServiceComponent(Service service,
+      String componentName) throws AmbariException {
+    ServiceComponent serviceComponent = null;
     try {
-      sc = s.getServiceComponent(svcComponent);
+      serviceComponent = service.getServiceComponent(componentName);
     } catch (ServiceComponentNotFoundException e) {
-      sc = serviceComponentFactory.createNew(s, svcComponent);
-      s.addServiceComponent(sc);
-      sc.persist();
+      serviceComponent = serviceComponentFactory.createNew(service,
+          componentName);
+      service.addServiceComponent(serviceComponent);
+      serviceComponent.setDesiredState(State.INSTALLED);
+      serviceComponent.persist();
     }
 
-    ServiceComponentHost impl = serviceComponentHostFactory.createNew(sc,
-        hostName);
-
-    impl.persist();
-    return impl;
+    return serviceComponent;
   }
 }