Parcourir la source

AMBARI-17970 - Ambari Sends Cached Configurations On Initial Task Execution Attempt (jonathanhurley)

Jonathan Hurley il y a 9 ans
Parent
commit
55303c4af2
17 fichiers modifiés avec 898 ajouts et 206 suppressions
  1. 12 20
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
  2. 119 16
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
  3. 9 6
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
  4. 34 0
      ambari-server/src/main/java/org/apache/ambari/server/events/jpa/EntityManagerCacheInvalidationEvent.java
  5. 79 0
      ambari-server/src/main/java/org/apache/ambari/server/events/jpa/JPAEvent.java
  6. 72 0
      ambari-server/src/main/java/org/apache/ambari/server/events/publishers/JPAEventPublisher.java
  7. 4 4
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterDAO.java
  8. 30 7
      ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
  9. 50 0
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ActionManagerTestHelper.java
  10. 1 7
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
  11. 6 11
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
  12. 49 37
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
  13. 270 0
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionSchedulerThreading.java
  14. 52 49
      ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java
  15. 0 20
      ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java
  16. 29 29
      ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
  17. 82 0
      ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java

+ 12 - 20
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java

@@ -21,20 +21,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
-import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ExecuteActionRequest;
-import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
-import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.topology.TopologyManager;
 import org.apache.ambari.server.utils.CommandUtils;
 import org.apache.ambari.server.utils.StageUtils;
@@ -43,8 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-import com.google.inject.persist.UnitOfWork;
 
 
 /**
@@ -55,25 +46,26 @@ public class ActionManager {
   private static Logger LOG = LoggerFactory.getLogger(ActionManager.class);
   private final ActionScheduler scheduler;
   private final ActionDBAccessor db;
-  private final ActionQueue actionQueue;
   private final AtomicLong requestCounter;
   private final RequestFactory requestFactory;
   private static TopologyManager topologyManager;
 
 
+  /**
+   * Guice-injected Constructor.
+   *
+   * @param db
+   * @param requestFactory
+   * @param scheduler
+   */
   @Inject
-  public ActionManager(@Named("schedulerSleeptime") long schedulerSleepTime,
-                       @Named("actionTimeout") long actionTimeout,
-                       ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
-                       UnitOfWork unitOfWork,
-                       RequestFactory requestFactory, Configuration configuration,
-                       AmbariEventPublisher ambariEventPublisher) {
-    actionQueue = aq;
+  public ActionManager(ActionDBAccessor db, RequestFactory requestFactory,
+      ActionScheduler scheduler) {
     this.db = db;
-    scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
-        actionQueue, fsm, 2, hostsMap, unitOfWork, ambariEventPublisher, configuration);
-    requestCounter = new AtomicLong(db.getLastPersistedRequestIdWhenInitialized());
     this.requestFactory = requestFactory;
+    this.scheduler = scheduler;
+
+    requestCounter = new AtomicLong(db.getLastPersistedRequestIdWhenInitialized());
   }
 
   public void start() {

+ 119 - 16
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -31,6 +31,8 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import javax.persistence.EntityManager;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ClusterNotFoundException;
 import org.apache.ambari.server.Role;
@@ -45,7 +47,9 @@ import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
+import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.JPAEventPublisher;
 import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.serveraction.ServerActionExecutor;
 import org.apache.ambari.server.state.Cluster;
@@ -68,7 +72,12 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.eventbus.Subscribe;
 import com.google.common.reflect.TypeToken;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
 import com.google.inject.persist.UnitOfWork;
 
 
@@ -77,6 +86,7 @@ import com.google.inject.persist.UnitOfWork;
  * Action schedule frequently looks at action database and determines if
  * there is an action that can be scheduled.
  */
+@Singleton
 class ActionScheduler implements Runnable {
 
   private static Logger LOG = LoggerFactory.getLogger(ActionScheduler.class);
@@ -84,21 +94,39 @@ class ActionScheduler implements Runnable {
   public static final String FAILED_TASK_ABORT_REASONING =
           "Server considered task failed and automatically aborted it";
 
+  @Inject
+  private UnitOfWork unitOfWork;
+
+  @Inject
+  private ActionQueue actionQueue;
+
+  @Inject
+  private Clusters clusters;
+
+  @Inject
+  private AmbariEventPublisher ambariEventPublisher;
+
+  @Inject
+  private HostsMap hostsMap;
+
+  @Inject
+  private Configuration configuration;
+
+  @Inject
+  Provider<EntityManager> entityManagerProvider;
+
+  volatile EntityManager threadEntityManager;
+
   private final long actionTimeout;
   private final long sleepTime;
-  private final UnitOfWork unitOfWork;
   private volatile boolean shouldRun = true;
   private Thread schedulerThread = null;
   private final ActionDBAccessor db;
-  private final short maxAttempts;
-  private final ActionQueue actionQueue;
-  private final Clusters clusters;
-  private final AmbariEventPublisher ambariEventPublisher;
+  private short maxAttempts = 2;
+  private final JPAEventPublisher jpaPublisher;
   private boolean taskTimeoutAdjustment = true;
-  private final HostsMap hostsMap;
   private final Object wakeupSyncObject = new Object();
   private final ServerActionExecutor serverActionExecutor;
-  private final Configuration configuration;
 
   private final Set<Long> requestsInProgress = new HashSet<Long>();
 
@@ -128,31 +156,82 @@ class ActionScheduler implements Runnable {
   private Cache<String, Map<String, String>> commandParamsStageCache;
   private Cache<String, Map<String, String>> hostParamsStageCache;
 
-  public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
-                         ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
-                         int maxAttempts, HostsMap hostsMap,
-                         UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
-                         Configuration configuration) {
+  /**
+   * Guice-injected Constructor.
+   *
+   * @param sleepTime
+   * @param actionTimeout
+   * @param db
+   * @param jpaPublisher
+   */
+  @Inject
+  public ActionScheduler(@Named("schedulerSleeptime") long sleepTime,
+      @Named("actionTimeout") long actionTimeout, ActionDBAccessor db,
+      JPAEventPublisher jpaPublisher) {
+
+    this.sleepTime = sleepTime;
+    this.actionTimeout = actionTimeout;
+    this.db = db;
+
+    this.jpaPublisher = jpaPublisher;
+    this.jpaPublisher.register(this);
+
+    serverActionExecutor = new ServerActionExecutor(db, sleepTime);
+
+    initializeCaches();
+  }
+
+  /**
+   * Unit Test Constructor.
+   *
+   * @param sleepTimeMilliSec
+   * @param actionTimeoutMilliSec
+   * @param db
+   * @param actionQueue
+   * @param fsmObject
+   * @param maxAttempts
+   * @param hostsMap
+   * @param unitOfWork
+   * @param ambariEventPublisher
+   * @param configuration
+   */
+  protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db,
+      ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap,
+      UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
+      Configuration configuration, Provider<EntityManager> entityManagerProvider) {
+
     sleepTime = sleepTimeMilliSec;
-    this.hostsMap = hostsMap;
     actionTimeout = actionTimeoutMilliSec;
     this.db = db;
     this.actionQueue = actionQueue;
     clusters = fsmObject;
-    this.ambariEventPublisher = ambariEventPublisher;
     this.maxAttempts = (short) maxAttempts;
-    serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec);
+    this.hostsMap = hostsMap;
     this.unitOfWork = unitOfWork;
+    this.ambariEventPublisher = ambariEventPublisher;
+    this.configuration = configuration;
+    this.entityManagerProvider = entityManagerProvider;
+    jpaPublisher = null;
+
+    serverActionExecutor = new ServerActionExecutor(db, sleepTime);
+    initializeCaches();
+  }
+
+  /**
+   * Initializes the caches.
+   */
+  private void initializeCaches(){
     clusterHostInfoCache = CacheBuilder.newBuilder().
         expireAfterAccess(5, TimeUnit.MINUTES).
         build();
+
     commandParamsStageCache = CacheBuilder.newBuilder().
       expireAfterAccess(5, TimeUnit.MINUTES).
       build();
+
     hostParamsStageCache = CacheBuilder.newBuilder().
       expireAfterAccess(5, TimeUnit.MINUTES).
       build();
-    this.configuration = configuration;
   }
 
   public void start() {
@@ -195,7 +274,9 @@ class ActionScheduler implements Runnable {
           }
           activeAwakeRequest = false;
         }
+
         doWork();
+
       } catch (InterruptedException ex) {
         LOG.warn("Scheduler thread is interrupted going to stop", ex);
         shouldRun = false;
@@ -213,6 +294,9 @@ class ActionScheduler implements Runnable {
     try {
       unitOfWork.begin();
 
+      // grab a reference to this UnitOfWork's EM
+      threadEntityManager = entityManagerProvider.get();
+
       // The first thing to do is to abort requests that are cancelled
       processCancelledRequestsList();
 
@@ -1159,6 +1243,25 @@ class ActionScheduler implements Runnable {
     return serverActionExecutor;
   }
 
+  /**
+   * Handles {@link EntityManagerCacheInvalidationEvent} instances and instructs
+   * the thread running this scheduler to evict instances from the
+   * {@link EntityManager}.
+   *
+   * @param event
+   *          the event to handle (not {@code null}).
+   */
+  @Subscribe
+  public void onEvent(EntityManagerCacheInvalidationEvent event) {
+    try {
+      if (null != threadEntityManager && threadEntityManager.isOpen()) {
+        threadEntityManager.clear();
+      }
+    } catch (Throwable throwable) {
+      LOG.error("Unable to clear the EntityManager for the scheduler thread", throwable);
+    }
+  }
+
   static class RoleStats {
     int numInProgress;
     int numQueued = 0;

+ 9 - 6
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java

@@ -30,6 +30,7 @@ import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.DesiredConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +42,6 @@ import com.google.inject.assistedinject.AssistedInject;
 public class ExecutionCommandWrapper {
 
   private final static Logger LOG = LoggerFactory.getLogger(ExecutionCommandWrapper.class);
-  private static String DELETED = "DELETED_";
   String jsonExecutionCommand = null;
   ExecutionCommand executionCommand = null;
 
@@ -88,8 +88,7 @@ public class ExecutionCommandWrapper {
 
     if( null == jsonExecutionCommand ){
       throw new RuntimeException(
-          "Invalid ExecutionCommandWrapper, both object and string"
-              + " representations are null");
+          "Invalid ExecutionCommandWrapper, both object and string representations are null");
     }
 
     try {
@@ -122,13 +121,17 @@ public class ExecutionCommandWrapper {
       // be refreshed with the latest.
       boolean refreshConfigTagsBeforeExecution = executionCommand.getForceRefreshConfigTagsBeforeExecution();
       if (refreshConfigTagsBeforeExecution) {
+        Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
+
         Map<String, Map<String, String>> configurationTags = configHelper.getEffectiveDesiredTags(
-            cluster, executionCommand.getHostname());
+            cluster, executionCommand.getHostname(), desiredConfigs);
+
+        LOG.debug(
+            "While scheduling task {} on cluster {}, configurations are being refreshed using desired configurations of {}",
+            executionCommand.getTaskId(), cluster.getClusterName(), desiredConfigs);
 
         // then clear out any existing configurations so that all of the new
         // configurations are forcefully applied
-        LOG.debug("Refreshing all configuration tags before execution");
-
         configurations.clear();
         executionCommand.setConfigurationTags(configurationTags);
       }

+ 34 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/jpa/EntityManagerCacheInvalidationEvent.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.jpa;
+
+/**
+ * The {@link EntityManagerCacheInvalidationEvent} represents an event which
+ * instructs JPA to clear its cache.
+ */
+public final class EntityManagerCacheInvalidationEvent extends JPAEvent {
+
+  /**
+   * Constructor.
+   *
+   * @param eventType
+   */
+  public EntityManagerCacheInvalidationEvent() {
+    super(JPAEventType.CACHE_INVALIDATION);
+  }
+}

+ 79 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/jpa/JPAEvent.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.jpa;
+
+import javax.persistence.EntityManagerFactory;
+
+import org.apache.ambari.server.events.AmbariEvent;
+
+/**
+ * The {@link JPAEvent} class is the base for all JPA events in Ambari. Although
+ * this class could inherit from {@link AmbariEvent}, the publishers for
+ * {@link AmbariEvent} instances use an asynchronous model. With JPA, we want a
+ * synchronous publication model in order to ensure that events are handled
+ * within the scope of the method invoking them.
+ */
+public abstract class JPAEvent {
+
+  /**
+   * The {@link JPAEventType} defines the type of JPA event.
+   */
+  public enum JPAEventType {
+
+    /**
+     * An event which instructs an {@link EntityManagerFactory} to evict
+     * instances of a particular class.
+     */
+    CACHE_INVALIDATION;
+  }
+
+  /**
+   * The concrete event's type.
+   */
+  protected final JPAEventType m_eventType;
+
+  /**
+   * Constructor.
+   *
+   * @param eventType
+   *          the type of event (not {@code null}).
+   */
+  public JPAEvent(JPAEventType eventType) {
+    m_eventType = eventType;
+  }
+
+  /**
+   * Gets the type of {@link JPAEvent}.
+   *
+   * @return the event type (never {@code null}).
+   */
+  public JPAEventType getType() {
+    return m_eventType;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder(getClass().getSimpleName());
+    buffer.append("{eventType=").append(m_eventType);
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 72 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/publishers/JPAEventPublisher.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import org.apache.ambari.server.events.jpa.JPAEvent;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link JPAEventPublisher} is used to publish events pertaining to
+ * EclipseLink and the persistence layer. This class uses a synchronized bus and
+ * will therefore run on the current thread. This is desirable since JPA events
+ * are time-senstive and cannot be executed in an asynchronous manner most of
+ * the time.
+ * <p/>
+ * This publisher was created specifically for AMBARI-17970. The headless server
+ * thread which monitors the database for things to do (known as the action
+ * scheduler) polls every second. However, it takes EclipseLink up to 1000ms to
+ * update L1 persistence context cache references in other threads. Therefore,
+ * it's possible that the action scheduler can pickup a {@link ClusterEntity}
+ * with stale data, including stale configurations. Those stale configurations
+ * can then be sent in commands down to agents.
+ */
+@Singleton
+public class JPAEventPublisher {
+  /**
+   * A single threaded, synchronous event bus for processing JPA events.
+   */
+  private final EventBus m_eventBus = new EventBus("ambari-jpa-event-bus");
+
+
+  /**
+   * Publishes the specified event to all registered listeners that
+   * {@link com.google.common.eventbus.Subscribe} to any of the
+   * {@link JPAEventPublisher} instances.
+   *
+   * @param event
+   *          the event
+   */
+  public void publish(JPAEvent event) {
+    m_eventBus.post(event);
+  }
+
+  /**
+   * Register a listener to receive events. The listener should use the
+   * {@link com.google.common.eventbus.Subscribe} annotation.
+   *
+   * @param object
+   *          the listener to receive events.
+   */
+  public void register(Object object) {
+    m_eventBus.register(object);
+  }
+}

+ 4 - 4
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/ClusterDAO.java

@@ -317,11 +317,11 @@ public class ClusterDAO {
   public void removeConfigMapping(ClusterConfigMappingEntity entity) {
     entityManagerProvider.get().remove(entity);
   }
-  
-  
+
+
   /**
    * Sets selected = 0, for clusterConfigEntities which has type_name which is in the given types list
-   * 
+   *
    * @param clusterId
    *          the cluster that the service is a part of.
    * @param types
@@ -332,7 +332,7 @@ public class ClusterDAO {
       if(types.isEmpty()) {
         return;
       }
-      
+
       TypedQuery<Long> query = entityManagerProvider.get().createQuery
           ("DELETE FROM ClusterConfigMappingEntity configs WHERE configs" +
             ".clusterId=?1 AND configs.typeName IN ?2", Long.class);

+ 30 - 7
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

@@ -62,7 +62,10 @@ import org.apache.ambari.server.controller.internal.UpgradeResourceProvider;
 import org.apache.ambari.server.events.AmbariEvent.AmbariEventType;
 import org.apache.ambari.server.events.ClusterConfigChangedEvent;
 import org.apache.ambari.server.events.ClusterEvent;
+import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
+import org.apache.ambari.server.events.jpa.JPAEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.JPAEventPublisher;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.cache.HostConfigMapping;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
@@ -294,6 +297,12 @@ public class ClusterImpl implements Cluster {
    */
   private AmbariEventPublisher eventPublisher;
 
+  /**
+   * Used for broadcasting {@link JPAEvent}s.
+   */
+  @Inject
+  private JPAEventPublisher jpaEventPublisher;
+
   /**
    * A simple cache for looking up {@code cluster-env} properties for a cluster.
    * This map is changed whenever {{cluster-env}} is changed and we receive a
@@ -3302,6 +3311,7 @@ public class ClusterImpl implements Cluster {
   @Override
   public void applyLatestConfigurations(StackId stackId) {
     clusterGlobalLock.writeLock().lock();
+
     try {
       ClusterEntity clusterEntity = getClusterEntity();
       Collection<ClusterConfigMappingEntity> configMappingEntities = clusterEntity.getConfigMappingEntities();
@@ -3320,16 +3330,18 @@ public class ClusterImpl implements Cluster {
 
       // loop through all configs and set the latest to enabled for the
       // specified stack
-      for(ClusterConfigMappingEntity e: configMappingEntities){
-        String type = e.getType();
-        String tag =  e.getTag();
+      for(ClusterConfigMappingEntity configMappingEntity: configMappingEntities){
+        String type = configMappingEntity.getType();
+        String tag =  configMappingEntity.getTag();
 
         for (ClusterConfigMappingEntity latest : latestConfigMappingByStack) {
-          String t = latest.getType();
-          String tagLatest = latest.getTag();
-          if(type.equals(t) && tag.equals(tagLatest) ){//find the latest config of a given mapping entity
+          String latestType = latest.getType();
+          String latestTag = latest.getTag();
+
+          // find the latest config of a given mapping entity
+          if (StringUtils.equals(type, latestType) && StringUtils.equals(tag, latestTag)) {
             LOG.info("{} with version tag {} is selected for stack {}", type, tag, stackId.toString());
-            e.setSelected(1);
+            configMappingEntity.setSelected(1);
           }
         }
       }
@@ -3342,6 +3354,17 @@ public class ClusterImpl implements Cluster {
     } finally {
       clusterGlobalLock.writeLock().unlock();
     }
+
+    LOG.info(
+        "Applied latest configurations for {} on stack {}. The desired configurations are now {}",
+        getClusterName(), stackId, getDesiredConfigs());
+
+    // publish an event to instruct entity managers to clear cached instances of
+    // ClusterEntity immediately - it takes EclipseLink about 1000ms to update
+    // the L1 caches of other threads and the action scheduler could act upon
+    // stale data
+    EntityManagerCacheInvalidationEvent event = new EntityManagerCacheInvalidationEvent();
+    jpaEventPublisher.publish(event);
   }
 
   public Collection<ClusterConfigMappingEntity> getLatestConfigMapping(List<ClusterConfigMappingEntity> clusterConfigMappingEntities){

+ 50 - 0
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ActionManagerTestHelper.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.actionmanager;
+
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+
+/**
+ * Provides supporting test methods for dealing with the {@link ActionManager}.
+ */
+public class ActionManagerTestHelper {
+
+  @Inject
+  private ActionDBAccessor actionDBAccessor;
+
+  @Inject
+  private Injector injector;
+
+  /**
+   * Gets an instance of the {@link ActionManager} with the
+   * {@link ActionManager#getTasks(java.util.Collection)} method mocked.
+   *
+   * @return
+   */
+  public ActionManager getMockActionManager() {
+    ActionManager actionManager = createMockBuilder(ActionManager.class).addMockedMethod(
+        "getTasks").withConstructor(actionDBAccessor, injector.getInstance(RequestFactory.class),
+            createNiceMock(ActionScheduler.class)).createMock();
+
+    return actionManager;
+  }
+}

+ 1 - 7
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java

@@ -31,13 +31,11 @@ import javax.persistence.EntityManager;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
-import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.api.services.BaseRequest;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.controller.internal.RequestResourceFilter;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.DBAccessorImpl;
@@ -65,7 +63,6 @@ import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.google.inject.persist.PersistService;
-import com.google.inject.persist.UnitOfWork;
 import com.google.inject.util.Modules;
 
 import junit.framework.Assert;
@@ -123,10 +120,7 @@ public class TestActionDBAccessorImpl {
     clusters.addCluster(clusterName, stackId);
     db = injector.getInstance(ActionDBAccessorImpl.class);
 
-    am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), injector.getInstance(UnitOfWork.class),
-
-		injector.getInstance(RequestFactory.class), null, null);
+    am = injector.getInstance(ActionManager.class);
 
     EasyMock.replay(injector.getInstance(AuditLogger.class));
   }

+ 6 - 11
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java

@@ -39,6 +39,7 @@ import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.controller.HostsMap;
+import org.apache.ambari.server.events.publishers.JPAEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.state.Clusters;
@@ -94,9 +95,7 @@ public class TestActionManager {
   @Test
   public void testActionResponse() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), unitOfWork,
-        injector.getInstance(RequestFactory.class), null, null);
+    ActionManager am = injector.getInstance(ActionManager.class);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -137,9 +136,7 @@ public class TestActionManager {
   @Test
   public void testActionResponsesUnsorted() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), unitOfWork,
-        injector.getInstance(RequestFactory.class), null, null);
+    ActionManager am = injector.getInstance(ActionManager.class);
     populateActionDBWithTwoCommands(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -176,9 +173,7 @@ public class TestActionManager {
   @Test
   public void testLargeLogs() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), unitOfWork,
-        injector.getInstance(RequestFactory.class), null, null);
+    ActionManager am = injector.getInstance(ActionManager.class);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -284,8 +279,8 @@ public class TestActionManager {
 
     replay(queue, db, clusters);
 
-    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, unitOfWork,
-        injector.getInstance(RequestFactory.class), null, null);
+    ActionScheduler actionScheduler = new ActionScheduler(0, 0, db, createNiceMock(JPAEventPublisher.class));
+    ActionManager manager = new ActionManager(db, injector.getInstance(RequestFactory.class), actionScheduler);
     assertSame(listStages, manager.getActions(requestId));
 
     verify(queue, db, clusters);

+ 49 - 37
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

@@ -54,6 +54,8 @@ import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.persistence.EntityManager;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -107,6 +109,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Provider;
 import com.google.inject.persist.PersistService;
 import com.google.inject.persist.UnitOfWork;
 
@@ -139,11 +142,16 @@ public class TestActionScheduler {
   @Inject
   HostDAO hostDAO;
 
+  Provider<EntityManager> entityManagerProviderMock = EasyMock.niceMock(Provider.class);
+
   @Before
   public void setup() throws Exception {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
     injector.injectMembers(this);
+
+    expect(entityManagerProviderMock.get()).andReturn(null);
+    replay(entityManagerProviderMock);
   }
 
   @After
@@ -207,7 +215,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
-        10000, new HostsMap((String) null), unitOfWork, null, conf);
+        10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -230,6 +238,8 @@ public class TestActionScheduler {
 
     //Wait for sometime, it shouldn't be scheduled this time.
     ac = waitForQueueSize(hostname, aq, 0, scheduler);
+
+    EasyMock.verify(entityManagerProviderMock);
   }
 
   private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq,
@@ -318,7 +328,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
 
@@ -346,6 +356,7 @@ public class TestActionScheduler {
     verify(db, times(1)).startRequest(eq(1L));
     verify(db, times(1)).abortOperation(1L);
 
+    EasyMock.verify(entityManagerProviderMock);
   }
 
   @Test
@@ -402,7 +413,7 @@ public class TestActionScheduler {
     AmbariEventPublisher aep = EasyMock.createNiceMock(AmbariEventPublisher.class);
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, aep, conf).
+            new HostsMap((String) null), unitOfWork, aep, conf, entityManagerProviderMock).
         addMockedMethod("cancelHostRoleCommands").
         createMock();
     scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class));
@@ -418,7 +429,7 @@ public class TestActionScheduler {
 
     Assert.assertEquals(HostRoleStatus.TIMEDOUT,stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
 
-    EasyMock.verify(scheduler);
+    EasyMock.verify(scheduler, entityManagerProviderMock);
   }
 
   @Test
@@ -525,7 +536,7 @@ public class TestActionScheduler {
 
     // Make sure the NN install doesn't timeout
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -639,7 +650,7 @@ public class TestActionScheduler {
 
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -715,7 +726,8 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf);
+        unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf,
+        entityManagerProviderMock);
 
     scheduler.doWork();
 
@@ -795,7 +807,7 @@ public class TestActionScheduler {
 
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState()
@@ -815,8 +827,8 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class)
       .withConstructor(long.class, long.class, ActionDBAccessor.class, ActionQueue.class, Clusters.class, int.class,
-        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class)
-      .withArgs(100L, 50L, null, null, null, -1, null, null, null, null)
+        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, Provider.class)
+      .withArgs(100L, 50L, null, null, null, -1, null, null, null, null, entityManagerProviderMock)
       .createNiceMock();
 
     EasyMock.replay(scheduler);
@@ -923,8 +935,9 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class)
       .withConstructor(long.class, long.class, ActionDBAccessor.class, ActionQueue.class, Clusters.class, int.class,
-        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class)
-      .withArgs(100L, 50L, db, aq, fsm, -1, null, null, ambariEventPublisher, null)
+        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, Provider.class)
+        .withArgs(100L, 50L, db, aq, fsm, -1, null, null, ambariEventPublisher, null,
+            entityManagerProviderMock)
       .createNiceMock();
 
     EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost);
@@ -1002,7 +1015,7 @@ public class TestActionScheduler {
     }).when(db).getTasksByRoleAndStatus(anyString(), any(HostRoleStatus.class));
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -1109,7 +1122,7 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, null, conf));
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -1200,7 +1213,7 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
             new HostsMap((String) null),
-            unitOfWork, null, conf));
+        unitOfWork, null, conf, entityManagerProviderMock));
 
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
@@ -1274,7 +1287,7 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf));
+        unitOfWork, null, conf, entityManagerProviderMock));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -1395,7 +1408,8 @@ public class TestActionScheduler {
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long)100, (long)50, db, aq, fsm, 3,
           new HostsMap((String) null),
-          unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf).
+            unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf,
+            entityManagerProviderMock).
           addMockedMethod("cancelHostRoleCommands").
           createMock();
     scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList),
@@ -1403,9 +1417,7 @@ public class TestActionScheduler {
     EasyMock.expectLastCall().once();
     EasyMock.replay(scheduler);
 
-    ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf,
-            EasyMock.createNiceMock(AmbariEventPublisher.class));
+    ActionManager am = new ActionManager(db, requestFactory, scheduler);
 
     scheduler.doWork();
 
@@ -1417,7 +1429,7 @@ public class TestActionScheduler {
     Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
     Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
     Assert.assertEquals(cancelCommandList.getValue().size(), 1);
-    EasyMock.verify(scheduler);
+    EasyMock.verify(scheduler, entityManagerProviderMock);
   }
 
   /**
@@ -1572,7 +1584,7 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf);
+        unitOfWork, null, conf, entityManagerProviderMock);
 
     scheduler.doWork();
 
@@ -1756,9 +1768,9 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf);
-    ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf, null);
+        unitOfWork, null, conf, entityManagerProviderMock);
+
+    ActionManager am = new ActionManager(db, requestFactory, scheduler);
 
     scheduler.doWork();
 
@@ -1940,7 +1952,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null), unitOfWork, null, conf);
+        10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -2023,7 +2035,7 @@ public class TestActionScheduler {
     when(db.getStagesInProgress()).thenReturn(stages);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
 
     final CountDownLatch abortCalls = new CountDownLatch(2);
 
@@ -2132,7 +2144,7 @@ public class TestActionScheduler {
 
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -2266,7 +2278,7 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf);
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock);
 
     scheduler.doWork();
 
@@ -2426,7 +2438,7 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf));
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -2486,7 +2498,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf);
+        unitOfWork, null, conf, entityManagerProviderMock);
 
     HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE);
     hrc1.setStatus(HostRoleStatus.COMPLETED);
@@ -2518,7 +2530,7 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf);
+        unitOfWork, null, conf, entityManagerProviderMock);
 
     HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE);
     hrc1.setStatus(HostRoleStatus.COMPLETED);
@@ -2657,7 +2669,7 @@ public class TestActionScheduler {
     }).when(db).abortOperation(anyLong());
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf));
+        new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock));
 
     doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString());
 
@@ -2675,6 +2687,7 @@ public class TestActionScheduler {
     Assert.assertEquals(HostRoleStatus.PENDING,
         stages.get(1).getHostRoleStatus(hostname1, "DATANODE"));
 
+    EasyMock.verify(entityManagerProviderMock);
   }
 
   @Test
@@ -2703,7 +2716,7 @@ public class TestActionScheduler {
     expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, null, null, 3,
-      new HostsMap((String) null), null, null, null);
+        new HostsMap((String) null), null, null, null, entityManagerProviderMock);
 
     replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand);
 
@@ -2722,7 +2735,7 @@ public class TestActionScheduler {
     expect(nextStage.getStageId()).andReturn(0L);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, null, null, null, 3,
-      new HostsMap((String) null), null, null, null);
+        new HostsMap((String) null), null, null, null, entityManagerProviderMock);
 
     replay(nextStage);
 
@@ -2760,7 +2773,7 @@ public class TestActionScheduler {
     expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, null, null, 3,
-      new HostsMap((String) null), null, null, null);
+        new HostsMap((String) null), null, null, null, entityManagerProviderMock);
 
     replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand);
 
@@ -2778,5 +2791,4 @@ public class TestActionScheduler {
       bind(Clusters.class).toInstance(mock(Clusters.class));
     }
   }
-
 }

+ 270 - 0
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionSchedulerThreading.java

@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.actionmanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+
+import javax.persistence.EntityManager;
+
+import org.apache.ambari.server.events.publishers.JPAEventPublisher;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.state.ConfigImpl;
+import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.StackId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
+
+import junit.framework.Assert;
+
+/**
+ * Tests {@link ActionScheduler}, focusing on multi-threaded concerns.
+ */
+public class TestActionSchedulerThreading {
+
+  private static Injector injector;
+
+  private Clusters clusters;
+  private OrmTestHelper ormTestHelper;
+
+  /**
+   * Setup test methods.
+   *
+   * @throws Exception
+   */
+  @Before
+  public void setup() throws Exception {
+    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector.getInstance(GuiceJpaInitializer.class);
+    clusters = injector.getInstance(Clusters.class);
+    ormTestHelper = injector.getInstance(OrmTestHelper.class);
+  }
+
+  /**
+   * Cleanup test methods.
+   */
+  @After
+  public void teardown() {
+    injector.getInstance(PersistService.class).stop();
+  }
+
+  /**
+   * Tests that applying configurations for a given stack correctly sets
+   * {@link DesiredConfig}s. This takes into account EclipseLink caching issues
+   * by specifically checking the actionscheduler headless thread.
+   */
+  @Test
+  public void testDesiredConfigurationsAfterApplyingLatestForStackInOtherThreads()
+      throws Exception {
+    long clusterId = ormTestHelper.createCluster(UUID.randomUUID().toString());
+    Cluster cluster = clusters.getCluster(clusterId);
+    ormTestHelper.addHost(clusters, cluster, "h1");
+
+    StackId stackId = cluster.getCurrentStackVersion();
+    StackId newStackId = new StackId("HDP-2.2.0");
+
+    // make sure the stacks are different
+    Assert.assertFalse(stackId.equals(newStackId));
+
+    Map<String, String> properties = new HashMap<String, String>();
+    Map<String, Map<String, String>> propertiesAttributes = new HashMap<String, Map<String, String>>();
+
+    // foo-type for v1 on current stack
+    properties.put("foo-property-1", "foo-value-1");
+    Config c1 = new ConfigImpl(cluster, "foo-type", properties, propertiesAttributes, injector);
+    c1.setTag("version-1");
+    c1.setStackId(stackId);
+    c1.setVersion(1L);
+
+    cluster.addConfig(c1);
+    c1.persist();
+
+    // make v1 "current"
+    cluster.addDesiredConfig("admin", Sets.newHashSet(c1), "note-1");
+
+    // bump the stack
+    cluster.setDesiredStackVersion(newStackId);
+
+    // save v2
+    // foo-type for v2 on new stack
+    properties.put("foo-property-2", "foo-value-2");
+    Config c2 = new ConfigImpl(cluster, "foo-type", properties, propertiesAttributes, injector);
+    c2.setTag("version-2");
+    c2.setStackId(newStackId);
+    c2.setVersion(2L);
+    cluster.addConfig(c2);
+    c2.persist();
+
+    // make v2 "current"
+    cluster.addDesiredConfig("admin", Sets.newHashSet(c2), "note-2");
+
+    // check desired config
+    Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
+    DesiredConfig desiredConfig = desiredConfigs.get("foo-type");
+    desiredConfig = desiredConfigs.get("foo-type");
+    assertNotNull(desiredConfig);
+    assertEquals(Long.valueOf(2), desiredConfig.getVersion());
+    assertEquals("version-2", desiredConfig.getTag());
+
+    final String hostName = cluster.getHosts().iterator().next().getHostName();
+
+    // move the stack back to the old stack
+    cluster.setDesiredStackVersion(stackId);
+
+    // create the semaphores, taking 1 from each to make them blocking from the
+    // start
+    Semaphore applyLatestConfigsSemaphore = new Semaphore(1, true);
+    Semaphore threadInitialCachingSemaphore = new Semaphore(1, true);
+    threadInitialCachingSemaphore.acquire();
+    applyLatestConfigsSemaphore.acquire();
+
+    final InstrumentedActionScheduler runnable = new InstrumentedActionScheduler(clusterId, hostName,
+        threadInitialCachingSemaphore, applyLatestConfigsSemaphore);
+
+    injector.injectMembers(runnable);
+
+    final Thread thread = new Thread(runnable);
+
+    // start the thread to populate the data in it, waiting to ensure that it
+    // finishes the calls it needs to make
+    thread.start();
+    threadInitialCachingSemaphore.acquire();
+
+    // apply the configs for the old stack
+    cluster.applyLatestConfigurations(stackId);
+
+    // wake the thread up and have it verify that it can see the updated configs
+    applyLatestConfigsSemaphore.release();
+
+    // wait for the thread to finish
+    thread.join();
+
+    // if the thread failed, then fail this test
+    runnable.validateAssertions();
+  }
+
+  /**
+   * Checks the desired configurations of a cluster in a separate thread in
+   * order to test whether the {@link EntityManager} has evicted stale entries.
+   */
+  private final static class InstrumentedActionScheduler extends ActionScheduler {
+    private final long clusterId;
+    private final Semaphore threadInitialCachingSemaphore;
+    private final Semaphore applyLatestConfigsSemaphore;
+    private final String hostName;
+    private Throwable throwable = null;
+
+    @Inject
+    private ConfigHelper configHelper;
+
+    @Inject
+    private Clusters clusters;
+
+    @Inject
+    private UnitOfWork unitOfWork;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterId
+     * @param hostName
+     * @param threadInitialCachingSemaphore
+     * @param applyLatestConfigsSemaphore
+     */
+    private InstrumentedActionScheduler(long clusterId, String hostName,
+        Semaphore threadInitialCachingSemaphore, Semaphore applyLatestConfigsSemaphore) {
+
+      super(1000, 1000, injector.getInstance(ActionDBAccessor.class),
+          injector.getInstance(JPAEventPublisher.class));
+
+      this.clusterId = clusterId;
+      this.threadInitialCachingSemaphore = threadInitialCachingSemaphore;
+      this.applyLatestConfigsSemaphore = applyLatestConfigsSemaphore;
+      this.hostName = hostName;
+    }
+
+    /**
+     *
+     */
+    @Override
+    public void run() {
+      unitOfWork.begin();
+      try {
+        // this is an important line - it replicates what the real scheduler
+        // does during its work routine by grabbing the current thread's
+        // EntityManager so the event can property evict entries
+        threadEntityManager = entityManagerProvider.get();
+
+        // first get the configs in order to cache the entities in this thread's
+        // L1 cache
+        Cluster cluster = clusters.getCluster(clusterId);
+
+        // {foo-type={tag=version-2}}
+        Map<String, Map<String, String>> effectiveDesiredTags = configHelper.getEffectiveDesiredTags(
+            cluster, hostName);
+
+        assertEquals("version-2", effectiveDesiredTags.get("foo-type").get("tag"));
+
+        // signal the caller that we're done making our initial call to populate
+        // the EntityManager
+        threadInitialCachingSemaphore.release();
+
+        // wait for the method to switch configs
+        applyLatestConfigsSemaphore.acquire();
+
+        // {foo-type={tag=version-1}}
+        effectiveDesiredTags = configHelper.getEffectiveDesiredTags(cluster, hostName);
+        assertEquals("version-1", effectiveDesiredTags.get("foo-type").get("tag"));
+      } catch (Throwable throwable) {
+        this.throwable = throwable;
+      } finally {
+        applyLatestConfigsSemaphore.release();
+        unitOfWork.end();
+      }
+    }
+
+    /**
+     * Raise an assertion if the thread failed.
+     */
+    private void validateAssertions() {
+      if (null != throwable) {
+        throw new AssertionError(throwable);
+      }
+    }
+  }
+
+}

+ 52 - 49
ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java

@@ -17,30 +17,48 @@
  */
 package org.apache.ambari.server.agent;
 
-import com.google.gson.JsonObject;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.persist.PersistService;
-import com.google.inject.persist.UnitOfWork;
-import junit.framework.Assert;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DATANODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostStatus;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOsType;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE_MASTER;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS_CLIENT;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_NAMENODE;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.ActionManagerTestHelper;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Request;
-import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -71,31 +89,14 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.google.gson.JsonObject;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
 
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DATANODE;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostStatus;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOsType;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE_MASTER;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS_CLIENT;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_NAMENODE;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
 
 public class HeartbeatProcessorTest {
 
@@ -115,6 +116,9 @@ public class HeartbeatProcessorTest {
   @Inject
   HeartbeatTestHelper heartbeatTestHelper;
 
+  @Inject
+  ActionManagerTestHelper actionManagerTestHelper;
+
   @Inject
   private HostRoleCommandFactory hostRoleCommandFactory;
 
@@ -200,7 +204,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -263,7 +267,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -348,7 +352,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -431,7 +435,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -507,7 +511,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -541,8 +545,7 @@ public class HeartbeatProcessorTest {
     clusters.addCluster(DummyCluster, dummyStackId);
 
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
-    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null, null);
+    ActionManager am = injector.getInstance(ActionManager.class);
     heartbeatTestHelper.populateActionDB(db, DummyHostname1, requestId, stageId);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -624,7 +627,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -744,7 +747,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -848,7 +851,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -946,7 +949,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -1062,7 +1065,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -1152,7 +1155,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -1180,7 +1183,7 @@ public class HeartbeatProcessorTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testHeartBeatWithAlertAndInvalidCluster() throws Exception {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
 
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>());
@@ -1243,7 +1246,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         Collections.singletonList(command)).anyTimes();
     replay(am);
@@ -1351,7 +1354,7 @@ public class HeartbeatProcessorTest {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);

+ 0 - 20
ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatTestHelper.java

@@ -23,8 +23,6 @@ import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRele
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOs;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyStackId;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HBASE;
-import static org.easymock.EasyMock.createMockBuilder;
-import static org.easymock.EasyMock.createNiceMock;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -40,13 +38,9 @@ import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.Request;
-import org.apache.ambari.server.actionmanager.RequestFactory;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
-import org.apache.ambari.server.configuration.Configuration;
-import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -140,20 +134,6 @@ public class HeartbeatTestHelper {
     return handler;
   }
 
-  public ActionManager getMockActionManager() {
-    ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
-    Clusters clustersMock = createNiceMock(Clusters.class);
-    Configuration configurationMock = createNiceMock(Configuration.class);
-
-    ActionManager actionManager = createMockBuilder(ActionManager.class).
-        addMockedMethod("getTasks").
-        withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
-            actionDBAccessor, new HostsMap((String) null), unitOfWork,
-            injector.getInstance(RequestFactory.class), configurationMock, createNiceMock(AmbariEventPublisher.class)).
-        createMock();
-    return actionManager;
-  }
-
   public Cluster getDummyCluster()
       throws AmbariException {
     Map<String, String> configProperties = new HashMap<String, String>() {{

+ 29 - 29
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -36,6 +36,7 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyString;
@@ -60,6 +61,7 @@ import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.ActionManagerTestHelper;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
@@ -106,10 +108,8 @@ import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
-import com.google.inject.persist.UnitOfWork;
 
 import junit.framework.Assert;
-import static org.junit.Assert.assertNull;
 
 public class TestHeartbeatHandler {
 
@@ -138,9 +138,10 @@ public class TestHeartbeatHandler {
   HeartbeatTestHelper heartbeatTestHelper;
 
   @Inject
-  AuditLogger auditLogger;
+  ActionManagerTestHelper actionManagerTestHelper;
 
-  private UnitOfWork unitOfWork;
+  @Inject
+  AuditLogger auditLogger;
 
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -155,8 +156,6 @@ public class TestHeartbeatHandler {
     injector.getInstance(GuiceJpaInitializer.class);
     clusters = injector.getInstance(Clusters.class);
     injector.injectMembers(this);
-    log.debug("Using server os type=" + config.getServerOsType());
-    unitOfWork = injector.getInstance(UnitOfWork.class);
     EasyMock.replay(auditLogger);
   }
 
@@ -169,7 +168,7 @@ public class TestHeartbeatHandler {
   @Test
   @SuppressWarnings("unchecked")
   public void testHeartbeat() throws Exception {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(new ArrayList<HostRoleCommand>());
     replay(am);
     Clusters fsm = clusters;
@@ -241,7 +240,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -326,7 +325,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -348,7 +347,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistration() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -379,7 +378,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationRecoveryConfig() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -443,7 +442,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationRecoveryConfigMaintenanceMode()
           throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -498,7 +497,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegistrationAgentConfig() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -531,7 +530,7 @@ public class TestHeartbeatHandler {
   public void testRegistrationWithBadVersion() throws AmbariException,
       InvalidStateTransitionException {
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -572,7 +571,7 @@ public class TestHeartbeatHandler {
 
   @Test
   public void testRegistrationPublicHostname() throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -605,7 +604,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testInvalidOSRegistration() throws AmbariException,
       InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -634,7 +633,7 @@ public class TestHeartbeatHandler {
   public void testIncompatibleAgentRegistration() throws AmbariException,
           InvalidStateTransitionException {
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
@@ -662,7 +661,7 @@ public class TestHeartbeatHandler {
   @Test
   public void testRegisterNewNode()
       throws AmbariException, InvalidStateTransitionException {
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     fsm.addHost(DummyHostname1);
@@ -755,7 +754,7 @@ public class TestHeartbeatHandler {
     HeartbeatMonitor hm = mock(HeartbeatMonitor.class);
     when(hm.generateStatusCommands(anyString())).thenReturn(dummyCmds);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     replay(am);
     Clusters fsm = clusters;
     ActionQueue actionQueue = new ActionQueue();
@@ -824,7 +823,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, RoleCommand.INSTALL);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -895,7 +894,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -966,7 +965,7 @@ public class TestHeartbeatHandler {
     hb.setComponentStatus(componentStatuses);
 
     ActionQueue aq = new ActionQueue();
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
             }});
@@ -1007,7 +1006,7 @@ public class TestHeartbeatHandler {
     ActionQueue aq = new ActionQueue();
 
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1, Role.DATANODE, null, null);
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -1090,7 +1089,7 @@ public class TestHeartbeatHandler {
 
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -1322,7 +1321,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -1375,7 +1374,8 @@ public class TestHeartbeatHandler {
     expected.setComponents(dummyComponents);
 
     heartbeatTestHelper.getDummyCluster();
-    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(heartbeatTestHelper.getMockActionManager(),
+    HeartBeatHandler handler = heartbeatTestHelper.getHeartBeatHandler(
+        actionManagerTestHelper.getMockActionManager(),
         new ActionQueue());
 
     ComponentsResponse actual = handler.handleComponents(DummyCluster);
@@ -1440,7 +1440,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
             Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
             new ArrayList<HostRoleCommand>() {{
               add(command);
@@ -1527,7 +1527,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);
@@ -1558,7 +1558,7 @@ public class TestHeartbeatHandler {
     final HostRoleCommand command = hostRoleCommandFactory.create(DummyHostname1,
         Role.DATANODE, null, null);
 
-    ActionManager am = heartbeatTestHelper.getMockActionManager();
+    ActionManager am = actionManagerTestHelper.getMockActionManager();
     expect(am.getTasks(anyObject(List.class))).andReturn(
         new ArrayList<HostRoleCommand>() {{
           add(command);

+ 82 - 0
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java

@@ -120,6 +120,7 @@ import org.mockito.ArgumentCaptor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -2629,6 +2630,87 @@ public class ClusterTest {
     }
   }
 
+  /**
+   * Tests that applying configurations for a given stack correctly sets
+   * {@link DesiredConfig}s.
+   */
+  @Test
+  public void testDesiredConfigurationsAfterApplyingLatestForStack() throws Exception {
+    createDefaultCluster();
+    Cluster cluster = clusters.getCluster("c1");
+    StackId stackId = cluster.getCurrentStackVersion();
+    StackId newStackId = new StackId("HDP-2.2.0");
+
+    ConfigHelper configHelper = injector.getInstance(ConfigHelper.class);
+
+    // make sure the stacks are different
+    Assert.assertFalse(stackId.equals(newStackId));
+
+    Map<String, String> properties = new HashMap<String, String>();
+    Map<String, Map<String, String>> propertiesAttributes = new HashMap<String, Map<String, String>>();
+
+    // foo-type for v1 on current stack
+    properties.put("foo-property-1", "foo-value-1");
+    Config c1 = new ConfigImpl(cluster, "foo-type", properties, propertiesAttributes, injector);
+    c1.setTag("version-1");
+    c1.setStackId(stackId);
+    c1.setVersion(1L);
+
+    cluster.addConfig(c1);
+    c1.persist();
+
+    // make v1 "current"
+    cluster.addDesiredConfig("admin", Sets.newHashSet(c1), "note-1");
+
+    // bump the stack
+    cluster.setDesiredStackVersion(newStackId);
+
+    // save v2
+    // foo-type for v2 on new stack
+    properties.put("foo-property-2", "foo-value-2");
+    Config c2 = new ConfigImpl(cluster, "foo-type", properties, propertiesAttributes, injector);
+    c2.setTag("version-2");
+    c2.setStackId(newStackId);
+    c2.setVersion(2L);
+    cluster.addConfig(c2);
+    c2.persist();
+
+    // make v2 "current"
+    cluster.addDesiredConfig("admin", Sets.newHashSet(c2), "note-2");
+
+    // check desired config
+    Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs();
+    DesiredConfig desiredConfig = desiredConfigs.get("foo-type");
+    desiredConfig = desiredConfigs.get("foo-type");
+    assertNotNull(desiredConfig);
+    assertEquals(Long.valueOf(2), desiredConfig.getVersion());
+    assertEquals("version-2", desiredConfig.getTag());
+
+    String hostName = cluster.getHosts().iterator().next().getHostName();
+
+    // {foo-type={tag=version-2}}
+    Map<String, Map<String, String>> effectiveDesiredTags = configHelper.getEffectiveDesiredTags(
+        cluster, hostName);
+
+    assertEquals("version-2", effectiveDesiredTags.get("foo-type").get("tag"));
+
+    // move the stack back to the old stack
+    cluster.setDesiredStackVersion(stackId);
+
+    // apply the configs for the old stack
+    cluster.applyLatestConfigurations(stackId);
+
+    // {foo-type={tag=version-1}}
+    effectiveDesiredTags = configHelper.getEffectiveDesiredTags(cluster, hostName);
+    assertEquals("version-1", effectiveDesiredTags.get("foo-type").get("tag"));
+
+    desiredConfigs = cluster.getDesiredConfigs();
+    desiredConfig = desiredConfigs.get("foo-type");
+    assertNotNull(desiredConfig);
+    assertEquals(Long.valueOf(1), desiredConfig.getVersion());
+    assertEquals("version-1", desiredConfig.getTag());
+  }
+
   /**
    * Tests removing configurations and configuration mappings by stack.
    *