Explorar o código

AMBARI-5089. Upgrade Issue with History Server in HDP 1. (aonishuk)

Andrew Onischuk %!s(int64=11) %!d(string=hai) anos
pai
achega
c82ea30e92

+ 108 - 2
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog150.java

@@ -3,24 +3,36 @@ package org.apache.ambari.server.upgrade;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
 import org.apache.ambari.server.orm.dao.ClusterStateDAO;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
+import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.KeyValueDAO;
+import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntityPK;
 import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
+import org.apache.ambari.server.orm.entities.ClusterServiceEntityPK;
 import org.apache.ambari.server.orm.entities.ClusterStateEntity;
 import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
 import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntityPK;
+import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.KeyValueEntity;
+import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
+import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK;
 import org.apache.ambari.server.state.HostComponentAdminState;
+import org.apache.ambari.server.state.State;
 import org.eclipse.persistence.jpa.JpaEntityManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +41,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.Query;
 import javax.persistence.TypedQuery;
 import javax.persistence.criteria.*;
+
 import java.io.File;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -427,8 +440,16 @@ public class UpgradeCatalog150 extends AbstractUpgradeCatalog {
         }
       }
     });
-
-
+    
+    // add history server on the host where jobtracker is
+    executeInTransaction(new Runnable() {
+      @Override
+      public void run() {
+        addHistoryServer();
+      }
+    });
+    
+      
     // ========================================================================
     // Finally update schema version
     updateMetaInfoVersion(getTargetVersion());
@@ -446,6 +467,91 @@ public class UpgradeCatalog150 extends AbstractUpgradeCatalog {
       }
     });
   }
+  
+  protected void addHistoryServer() {
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    ClusterServiceDAO clusterServiceDAO = injector.getInstance(ClusterServiceDAO.class);
+    ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO = injector.getInstance(ServiceComponentDesiredStateDAO.class);
+    
+    List<ClusterEntity> clusterEntities = clusterDAO.findAll();
+    for (final ClusterEntity clusterEntity : clusterEntities) {
+      ServiceComponentDesiredStateEntityPK pkHS = new ServiceComponentDesiredStateEntityPK();
+      pkHS.setComponentName("HISTORYSERVER");
+      pkHS.setClusterId(clusterEntity.getClusterId());
+      pkHS.setServiceName("MAPREDUCE");
+      
+      ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntityHS = serviceComponentDesiredStateDAO.findByPK(pkHS);
+      
+      // already have historyserver
+      if(serviceComponentDesiredStateEntityHS != null)
+        continue;
+      
+      ServiceComponentDesiredStateEntityPK pkJT = new ServiceComponentDesiredStateEntityPK();
+      pkJT.setComponentName("JOBTRACKER");
+      pkJT.setClusterId(clusterEntity.getClusterId());
+      pkJT.setServiceName("MAPREDUCE");
+      
+      ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntityJT = serviceComponentDesiredStateDAO.findByPK(pkJT);
+      
+      // no jobtracker present probably mapreduce is not installed
+      if(serviceComponentDesiredStateEntityJT == null)
+        continue;
+
+      
+      HostComponentStateEntity jtHostComponentStateEntity = serviceComponentDesiredStateEntityJT.getHostComponentStateEntities().iterator().next();
+      String jtHostname = jtHostComponentStateEntity.getHostName();
+      State jtCurrState = jtHostComponentStateEntity.getCurrentState();
+          
+      ClusterServiceEntityPK pk = new ClusterServiceEntityPK();
+      pk.setClusterId(clusterEntity.getClusterId());
+      pk.setServiceName("MAPREDUCE");
+      
+      ClusterServiceEntity clusterServiceEntity = clusterServiceDAO.findByPK(pk);
+      
+      
+      final ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity = new ServiceComponentDesiredStateEntity();
+      serviceComponentDesiredStateEntity.setComponentName("HISTORYSERVER");
+      serviceComponentDesiredStateEntity.setDesiredStackVersion(clusterEntity.getDesiredStackVersion());
+      serviceComponentDesiredStateEntity.setDesiredState(State.STARTED);
+      serviceComponentDesiredStateEntity.setClusterServiceEntity(clusterServiceEntity);
+      serviceComponentDesiredStateEntity.setHostComponentDesiredStateEntities(new ArrayList<HostComponentDesiredStateEntity>());
+
+      final HostComponentStateEntity stateEntity = new HostComponentStateEntity();
+      stateEntity.setHostName(jtHostname);
+      stateEntity.setCurrentState(jtCurrState);
+      stateEntity.setCurrentStackVersion(clusterEntity.getDesiredStackVersion());
+      
+      final HostComponentDesiredStateEntity desiredStateEntity = new HostComponentDesiredStateEntity();
+      desiredStateEntity.setDesiredState(State.STARTED);
+      desiredStateEntity.setDesiredStackVersion(clusterEntity.getDesiredStackVersion());
+      
+      persistComponentEntities(stateEntity, desiredStateEntity, serviceComponentDesiredStateEntity);
+    }
+  }
+    
+  private void persistComponentEntities(HostComponentStateEntity stateEntity, HostComponentDesiredStateEntity desiredStateEntity, ServiceComponentDesiredStateEntity serviceComponentDesiredStateEntity) {
+    ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO = injector.getInstance(ServiceComponentDesiredStateDAO.class);
+    HostComponentStateDAO hostComponentStateDAO = injector.getInstance(HostComponentStateDAO.class);
+    HostComponentDesiredStateDAO hostComponentDesiredStateDAO = injector.getInstance(HostComponentDesiredStateDAO.class);
+    HostDAO hostDAO = injector.getInstance(HostDAO.class);
+    
+    HostEntity hostEntity = hostDAO.findByName(stateEntity.getHostName());
+    hostEntity.getHostComponentStateEntities().add(stateEntity);
+    hostEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
+
+    serviceComponentDesiredStateEntity.getHostComponentDesiredStateEntities().add(desiredStateEntity);
+
+    desiredStateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
+    desiredStateEntity.setHostEntity(hostEntity);
+    stateEntity.setServiceComponentDesiredStateEntity(serviceComponentDesiredStateEntity);
+    stateEntity.setHostEntity(hostEntity);
+
+    hostComponentStateDAO.create(stateEntity);
+    hostComponentDesiredStateDAO.create(desiredStateEntity);
+
+    serviceComponentDesiredStateDAO.create(serviceComponentDesiredStateEntity);
+    hostDAO.merge(hostEntity);
+  }
 
   protected void processDecommissionedDatanodes() {
     KeyValueDAO keyValueDAO = injector.getInstance(KeyValueDAO.class);

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/1.3.2/services/MAPREDUCE/package/scripts/params.py

@@ -56,7 +56,7 @@ exclude_file_path = config['configurations']['mapred-site']['mapred.hosts.exclud
 mapred_hosts_file_path = config['configurations']['mapred-site']['mapred.hosts']
 
 #hdfs directories
-mapreduce_jobhistory_intermediate_done_dir = config['configurations']['mapred-site']['mapreduce.jobhistory.intermediate-done-dir']
+mapreduce_jobhistory_intermediate_done_dir = default('/configurations/mapred-site/mapreduce.jobhistory.intermediate-done-dir', '/mr-history/tmp')
 mapreduce_jobhistory_done_dir = config['configurations']['mapred-site']['mapred.job.tracker.history.completed.location']
 #for create_hdfs_directory
 hostname = config["hostname"]

+ 92 - 4
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog150Test.java

@@ -20,9 +20,10 @@ package org.apache.ambari.server.upgrade;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
-import com.google.inject.persist.Transactional;
+
 import junit.framework.Assert;
-import org.apache.ambari.server.orm.DBAccessorImpl;
+
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -30,27 +31,31 @@ import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.dao.KeyValueDAO;
-import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
 import org.apache.ambari.server.orm.entities.ClusterEntity;
 import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
 import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
+import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
 import org.apache.ambari.server.orm.entities.HostEntity;
 import org.apache.ambari.server.orm.entities.KeyValueEntity;
 import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
+import org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity;
 import org.apache.ambari.server.state.HostComponentAdminState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.Collections;
 
+import javax.persistence.EntityManager;
+
 public class UpgradeCatalog150Test {
   private Injector injector;
   private final String CLUSTER_NAME = "c1";
   private final String SERVICE_NAME = "HDFS";
   private final String HOST_NAME = "h1";
+  private final String DESIRED_STACK_VERSION = "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.4\"}";
 
   @Before
   public void setup() throws Exception {
@@ -81,6 +86,26 @@ public class UpgradeCatalog150Test {
     clusterServiceDAO.create(clusterServiceEntity);
     return clusterServiceEntity;
   }
+  
+  private ClusterServiceEntity addService(ClusterEntity clusterEntity, String serviceName) {
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    
+    ClusterServiceEntity clusterServiceEntity = new ClusterServiceEntity();
+    clusterServiceEntity.setClusterEntity(clusterEntity);
+    clusterServiceEntity.setServiceName(serviceName);
+    
+    ServiceDesiredStateEntity serviceDesiredStateEntity = new ServiceDesiredStateEntity();
+    serviceDesiredStateEntity.setDesiredStackVersion(DESIRED_STACK_VERSION);
+    serviceDesiredStateEntity.setClusterServiceEntity(clusterServiceEntity);
+    
+    clusterServiceEntity.setServiceDesiredStateEntity(serviceDesiredStateEntity);
+    clusterEntity.getClusterServiceEntities().add(clusterServiceEntity);
+    
+    clusterDAO.merge(clusterEntity);
+    
+    return clusterServiceEntity;
+  }
+
 
   private HostEntity createHost(ClusterEntity clusterEntity) {
     HostDAO hostDAO = injector.getInstance(HostDAO.class);
@@ -93,6 +118,69 @@ public class UpgradeCatalog150Test {
     clusterDAO.merge(clusterEntity);
     return hostEntity;
   }
+  
+  private void addComponent(ClusterEntity clusterEntity, ClusterServiceEntity clusterServiceEntity, HostEntity hostEntity, String componentName) {
+    ServiceComponentDesiredStateEntity componentDesiredStateEntity = new ServiceComponentDesiredStateEntity();
+    componentDesiredStateEntity.setClusterServiceEntity(clusterServiceEntity);
+    componentDesiredStateEntity.setComponentName(componentName);
+    componentDesiredStateEntity.setHostComponentStateEntities(new ArrayList<HostComponentStateEntity>());
+    
+    HostComponentDesiredStateEntity hostComponentDesiredStateEntity = new HostComponentDesiredStateEntity();
+    hostComponentDesiredStateEntity.setAdminState(HostComponentAdminState.INSERVICE);
+    hostComponentDesiredStateEntity.setServiceComponentDesiredStateEntity(componentDesiredStateEntity);
+    hostComponentDesiredStateEntity.setHostEntity(hostEntity);
+    
+    HostComponentStateEntity hostComponentStateEntity = new HostComponentStateEntity();
+    hostComponentStateEntity.setHostEntity(hostEntity);
+    hostComponentStateEntity.setHostName(hostEntity.getHostName());
+    hostComponentStateEntity.setCurrentStackVersion(clusterEntity.getDesiredStackVersion());
+    hostComponentStateEntity.setServiceComponentDesiredStateEntity(componentDesiredStateEntity);
+    
+    componentDesiredStateEntity.getHostComponentStateEntities().add(hostComponentStateEntity);
+    hostEntity.getHostComponentStateEntities().add(hostComponentStateEntity);
+    hostEntity.getHostComponentDesiredStateEntities().add(hostComponentDesiredStateEntity);
+    clusterServiceEntity.getServiceComponentDesiredStateEntities().add(componentDesiredStateEntity);
+    
+    ClusterServiceDAO clusterServiceDAO = injector.getInstance(ClusterServiceDAO.class);
+    clusterServiceDAO.merge(clusterServiceEntity);
+  }
+  
+  protected void executeInTransaction(Runnable func) {
+    EntityManager entityManager = injector.getProvider(EntityManager.class).get();
+    if (entityManager.getTransaction().isActive()) { //already started, reuse
+      func.run();
+    } else {
+      entityManager.getTransaction().begin();
+      try {
+        func.run();
+        entityManager.getTransaction().commit();
+      } catch (Exception e) {
+        //LOG.error("Error in transaction ", e);
+        if (entityManager.getTransaction().isActive()) {
+          entityManager.getTransaction().rollback();
+        }
+        throw new RuntimeException(e);
+      }
+
+    }
+  }
+  
+  @Test
+  public void testAddHistoryServer() throws AmbariException {
+    final ClusterEntity clusterEntity = createCluster();
+    final ClusterServiceEntity clusterServiceEntityMR = addService(clusterEntity, "MAPREDUCE");
+    final HostEntity hostEntity = createHost(clusterEntity);
+    
+    executeInTransaction(new Runnable() {
+      @Override
+      public void run() {
+        addComponent(clusterEntity, clusterServiceEntityMR, hostEntity, "JOBTRACKER");
+      }
+    });
+    
+    UpgradeCatalog150 upgradeCatalog150 = injector.getInstance(UpgradeCatalog150.class);
+    upgradeCatalog150.addHistoryServer();
+  }
 
   @Test
   public void testProcessDecommissionedDatanodes() throws Exception {