Преглед изворни кода

AMBARI-3136. Reduce the size of ExecutionCommand entity. (Myroslav Papirkovskyy via swagle)

Siddharth Wagle пре 11 година
родитељ
комит
207047375c

+ 5 - 0
ambari-project/pom.xml

@@ -203,6 +203,11 @@
         <artifactId>guava</artifactId>
         <version>14.0.1</version>
       </dependency>
+      <dependency>
+        <groupId>com.google.code.findbugs</groupId>
+        <artifactId>jsr305</artifactId>
+        <version>1.3.9</version>
+      </dependency>
       <dependency>
         <groupId>org.mockito</groupId>
         <artifactId>mockito-core</artifactId>

+ 5 - 0
ambari-server/pom.xml

@@ -766,6 +766,11 @@
       <artifactId>guava</artifactId>
       <version>14.0.1</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>1.3.9</version>
+    </dependency>
   </dependencies>
   <!--<reporting>
         <plugins>

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java

@@ -71,9 +71,9 @@ public class ActionManager {
   }
 
   public void sendActions(List<Stage> stages) {
-    
-    for (Stage s: stages) {
-      if (LOG.isDebugEnabled()) {
+
+    if (LOG.isDebugEnabled()) {
+      for (Stage s : stages) {
         LOG.debug("Persisting stage into db: " + s.toString());
       }
     }

+ 57 - 4
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java

@@ -17,16 +17,26 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class ExecutionCommandWrapper {
   private static Log LOG = LogFactory.getLog(ExecutionCommandWrapper.class);
+  @Inject
+  static Injector injector;
+
   String jsonExecutionCommand = null;
   ExecutionCommand executionCommand = null;
 
@@ -43,8 +53,51 @@ public class ExecutionCommandWrapper {
       return executionCommand;
     } else if (jsonExecutionCommand != null) {
 //      try {
-        executionCommand = StageUtils.getGson().fromJson(jsonExecutionCommand, ExecutionCommand.class);
-        return executionCommand;
+      executionCommand = StageUtils.getGson().fromJson(jsonExecutionCommand, ExecutionCommand.class);
+
+      if (injector == null) {
+        throw new RuntimeException("Injector not found, configuration cannot be restored");
+      } else if ((executionCommand.getConfigurations() == null || executionCommand.getConfigurations().isEmpty()) &&
+          executionCommand.getConfigurationTags() != null &&
+          !executionCommand.getConfigurationTags().isEmpty()) {
+
+        Clusters clusters = injector.getInstance(Clusters.class);
+        HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
+        Long clusterId = hostRoleCommandDAO.findByPK(executionCommand.getTaskId()).getStage().getCluster().getClusterId();
+
+        try {
+          Cluster cluster = clusters.getClusterById(clusterId);
+          Map<String, Map<String, String>> configurations = new HashMap<String, Map<String, String>>();
+
+          for (Map.Entry<String, Map<String, String>> entry : executionCommand.getConfigurationTags().entrySet()) {
+            String type = entry.getKey();
+            Map<String, String> tags = entry.getValue();
+
+            if (!configurations.containsKey(type)) {
+              configurations.put(type, new HashMap<String, String>());
+            }
+
+            String tag;
+
+            //perform override
+            //TODO align with configs override logic
+            tag = tags.get("host_override_tag");
+            tag = tag == null ? tags.get("service_override_tag") : tag;
+            tag = tag == null ? tags.get("tag") : tag;
+
+            if (tag != null) {
+              Config config = cluster.getConfig(type, tag);
+              configurations.get(type).putAll(config.getProperties());
+            }
+          }
+
+          executionCommand.setConfigurations(configurations);
+        } catch (AmbariException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      return executionCommand;
 //      } catch (IOException e) {
 //        throw new RuntimeException(e);
 //      }

+ 4 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java

@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
@@ -65,7 +67,8 @@ public class HostRoleCommand {
     this.roleCommand = command;
   }
 
-  public HostRoleCommand(HostRoleCommandEntity hostRoleCommandEntity, Injector injector) {
+  @AssistedInject
+  public HostRoleCommand(@Assisted HostRoleCommandEntity hostRoleCommandEntity, Injector injector) {
     taskId = hostRoleCommandEntity.getTaskId();
     stageId = hostRoleCommandEntity.getStage().getStageId();
     requestId = hostRoleCommandEntity.getStage().getRequestId();

+ 20 - 9
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -24,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.gson.reflect.TypeToken;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.agent.ExecutionCommand;
@@ -44,15 +46,19 @@ import com.google.inject.Injector;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 
+import javax.annotation.Nullable;
+
 //This class encapsulates the stage. The stage encapsulates all the information
 //required to persist an action.
 public class Stage {
+
   private static Logger LOG = LoggerFactory.getLogger(Stage.class);
   private final long requestId;
   private final String clusterName;
   private long stageId = -1;
   private final String logDir;
   private final String requestContext;
+
   private int taskTimeout = -1;
   private int perTaskTimeFactor = 60000;
 
@@ -67,7 +73,7 @@ public class Stage {
 
   @AssistedInject
   public Stage(@Assisted long requestId, @Assisted("logDir") String logDir, @Assisted("clusterName") String clusterName,
-               @Assisted("requestContext") String requestContext) {
+               @Assisted("requestContext") @Nullable String requestContext) {
     this.requestId = requestId;
     this.logDir = logDir;
     this.clusterName = clusterName;
@@ -95,14 +101,19 @@ public class Stage {
     clusterName = stageEntity.getCluster().getClusterName();
     requestContext = stageEntity.getRequestContext();
 
-    for (HostEntity hostEntity : hostDAO.findByStage(stageEntity)) {
-      List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findSortedCommandsByStageAndHost(stageEntity, hostEntity);
-      commandsToSend.put(hostEntity.getHostName(), new ArrayList<ExecutionCommandWrapper>());
-      hostRoleCommands.put(hostEntity.getHostName(), new TreeMap<String, HostRoleCommand>());
-      for (HostRoleCommandEntity command : commands) {
-        HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(command);
-        hostRoleCommands.get(hostEntity.getHostName()).put(hostRoleCommand.getRole().toString(), hostRoleCommand);
-        commandsToSend.get(hostEntity.getHostName()).add(hostRoleCommand.getExecutionCommandWrapper());
+
+    Map<String, List<HostRoleCommandEntity>> hostCommands = hostRoleCommandDAO.findSortedCommandsByStage(stageEntity);
+
+    for (Map.Entry<String, List<HostRoleCommandEntity>> entry : hostCommands.entrySet()) {
+      String hostname = entry.getKey();
+      commandsToSend.put(hostname, new ArrayList<ExecutionCommandWrapper>());
+      hostRoleCommands.put(hostname, new TreeMap<String, HostRoleCommand>());
+      for (HostRoleCommandEntity hostRoleCommandEntity : entry.getValue()) {
+        HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(hostRoleCommandEntity);
+
+
+        hostRoleCommands.get(hostname).put(hostRoleCommand.getRole().toString(), hostRoleCommand);
+        commandsToSend.get(hostname).add(hostRoleCommand.getExecutionCommandWrapper());
       }
     }
 

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/StageFactory.java

@@ -24,7 +24,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 public interface StageFactory {
 
   Stage createNew(long requestId, @Assisted("logDir") String logDir, @Assisted("clusterName") String clusterName,
-                  @Assisted("requestContext") String requestContext);
+                   @Assisted("requestContext") String requestContext);
 
   Stage createExisting(String actionId);
 

+ 6 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -946,7 +946,7 @@ public class AmbariManagementControllerImpl implements
 
   private Stage createNewStage(Cluster cluster, long requestId, String requestContext) {
     String logDir = baseLogDir + File.pathSeparator + requestId;
-    Stage stage = new Stage(requestId, logDir, cluster.getClusterName(), requestContext);
+    Stage stage = stageFactory.createNew(requestId, logDir, cluster.getClusterName(), requestContext);
     return stage;
   }
 
@@ -1900,6 +1900,8 @@ public class AmbariManagementControllerImpl implements
       Config svcConfig = service.getDesiredConfigs().get(type);
       if (null != svcConfig && !svcConfig.getVersionTag().equals(tag)) {
         props.putAll(svcConfig.getProperties());
+        //TODO why don't update tags with service overrides?
+        tags.put("service_override_tag", svcConfig.getVersionTag());
       }
 
       // 3) apply the host overrides, if any
@@ -1913,7 +1915,8 @@ public class AmbariManagementControllerImpl implements
         }
       }
 
-      configurations.put(type, props);
+      //TODO store empty map for now
+//      configurations.put(type, props);
       configTags.put(type, tags);
     }
 
@@ -4165,6 +4168,7 @@ public class AmbariManagementControllerImpl implements
       actionRequest.getActionName()).getExecutionCommand();
 
     execCmd.setConfigurations(configurations);
+    execCmd.setConfigurationTags(configTags);
 
     Map<String, String> params = new TreeMap<String, String>();
     params.put("jdk_location", this.jdkResourceUrl);

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java

@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.ambari.eventdb.webservice.WorkflowJsonService;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
 import org.apache.ambari.server.agent.HeartBeatHandler;
 import org.apache.ambari.server.agent.rest.AgentResource;
 import org.apache.ambari.server.api.AmbariPersistFilter;
@@ -41,6 +42,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.PersistenceType;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.MetainfoDAO;
 import org.apache.ambari.server.resources.ResourceManager;
 import org.apache.ambari.server.resources.api.rest.GetResource;

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java

@@ -90,6 +90,8 @@ public class ControllerModule extends AbstractModule {
     bind(AbstractRootServiceResponseFactory.class).to(RootServiceResponseFactory.class);
     bind(HBaseMasterPortScanner.class).in(Singleton.class);
     bind(ServerActionManager.class).to(ServerActionManagerImpl.class);
+
+    requestStaticInjection(ExecutionCommandWrapper.class);
   }
 
   private JpaPersistModule buildJpaPersistModule() {

+ 33 - 6
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java

@@ -34,8 +34,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.Query;
 import javax.persistence.TypedQuery;
 
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 
 @Singleton
 public class HostRoleCommandDAO {
@@ -108,6 +107,27 @@ public class HostRoleCommandDAO {
     return daoUtils.selectList(query, stageEntity, hostEntity);
   }
 
+  @Transactional
+  public Map<String, List<HostRoleCommandEntity>> findSortedCommandsByStage(StageEntity stageEntity) {
+    TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +
+        "FROM HostRoleCommandEntity hostRoleCommand " +
+        "WHERE hostRoleCommand.stage=?1 " +
+        "ORDER BY hostRoleCommand.hostName, hostRoleCommand.taskId", HostRoleCommandEntity.class);
+    List<HostRoleCommandEntity> commandEntities = daoUtils.selectList(query, stageEntity);
+
+    Map<String, List<HostRoleCommandEntity>> hostCommands = new HashMap<String, List<HostRoleCommandEntity>>();
+
+    for (HostRoleCommandEntity commandEntity : commandEntities) {
+      if (!hostCommands.containsKey(commandEntity.getHostName())) {
+        hostCommands.put(commandEntity.getHostName(), new ArrayList<HostRoleCommandEntity>());
+      }
+
+      hostCommands.get(commandEntity.getHostName()).add(commandEntity);
+    }
+
+    return hostCommands;
+  }
+
   @Transactional
   public List<HostRoleCommandEntity> findByHostRole(String hostName, long requestId, long stageId, Role role) {
     TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT command " +
@@ -134,11 +154,18 @@ public class HostRoleCommandDAO {
    * NB: You cannot rely on return value if batch write is enabled
    */
   public int updateStatusByRequestId(long requestId, HostRoleStatus target, Collection<HostRoleStatus> sources) {
-    Query query = entityManagerProvider.get().createQuery("UPDATE HostRoleCommandEntity command " +
-        "SET command.status=?1 " +
-        "WHERE command.requestId=?2 AND command.status IN ?3");
+    TypedQuery<HostRoleCommandEntity> selectQuery = entityManagerProvider.get().createQuery("SELECT command " +
+        "FROM HostRoleCommandEntity command " +
+        "WHERE command.requestId=?1 AND command.status IN ?2", HostRoleCommandEntity.class);
+
+    List<HostRoleCommandEntity> commandEntities = daoUtils.selectList(selectQuery, requestId, sources);
+
+    for (HostRoleCommandEntity entity : commandEntities) {
+      entity.setStatus(target);
+      merge(entity);
+    }
 
-    return daoUtils.executeUpdate(query, target, requestId, sources);
+    return commandEntities.size();
   }
 
   @Transactional

+ 0 - 2
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java

@@ -18,8 +18,6 @@
 
 package org.apache.ambari.server.orm.entities;
 
-import org.apache.commons.lang.StringUtils;
-
 import javax.persistence.*;
 import java.util.Collection;
 

+ 11 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java

@@ -60,7 +60,9 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.RoleDAO;
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
 import org.apache.ambari.server.orm.entities.RoleEntity;
 import org.apache.ambari.server.security.authorization.Users;
 import org.apache.ambari.server.serveraction.ServerAction;
@@ -3559,6 +3561,15 @@ public class AmbariManagementControllerTest {
     List<HostRoleCommand> storedTasks = actionDB.getRequestTasks(response.getRequestId());
     Stage stage = actionDB.getAllStages(response.getRequestId()).get(0);
 
+    //Check configs not stored with execution command
+    ExecutionCommandDAO executionCommandDAO = injector.getInstance(ExecutionCommandDAO.class);
+    ExecutionCommandEntity commandEntity = executionCommandDAO.findByPK(task.getTaskId());
+    ExecutionCommand executionCommand =
+        StageUtils.fromJson(new String(commandEntity.getCommand()), ExecutionCommand.class);
+
+    assertFalse(executionCommand.getConfigurationTags().isEmpty());
+    assertTrue(executionCommand.getConfigurations() == null || executionCommand.getConfigurations().isEmpty());
+
     assertEquals(1, storedTasks.size());
     HostRoleCommand hostRoleCommand = storedTasks.get(0);
 

+ 2 - 1
ambari-server/src/test/java/org/apache/ambari/server/orm/JdbcPropertyTest.java

@@ -30,10 +30,11 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 
 public class JdbcPropertyTest {
-  Properties properties = new Properties();
+  Properties properties;
 
   @Before
   public void configure() {
+    properties = new Properties();
     properties.setProperty(Configuration.SERVER_PERSISTENCE_TYPE_KEY, "in-memory");
     properties.setProperty(Configuration.METADETA_DIR_PATH, "src/test/resources/stacks");
     properties.setProperty(Configuration.SERVER_VERSION_FILE, "target/version");