Ver código fonte

AMBARI-4983. During upgrade. migrate decommissioned DN hosts list to the new format. (swagle)

Siddharth Wagle 11 anos atrás
pai
commit
0f576396df

+ 79 - 0
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog150.java

@@ -1,5 +1,6 @@
 package org.apache.ambari.server.upgrade;
 
+import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.ambari.server.AmbariException;
@@ -8,10 +9,18 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.orm.DBAccessor.DBColumnInfo;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ClusterStateDAO;
+import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.KeyValueDAO;
+import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
+import org.apache.ambari.server.orm.entities.ClusterConfigEntityPK;
 import org.apache.ambari.server.orm.entities.ClusterEntity;
 import org.apache.ambari.server.orm.entities.ClusterStateEntity;
+import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
+import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntityPK;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.orm.entities.KeyValueEntity;
+import org.apache.ambari.server.state.HostComponentAdminState;
 import org.eclipse.persistence.jpa.JpaEntityManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,6 +34,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class UpgradeCatalog150 extends AbstractUpgradeCatalog {
   private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog150.class);
@@ -418,6 +428,75 @@ public class UpgradeCatalog150 extends AbstractUpgradeCatalog {
     // Finally update schema version
     updateMetaInfoVersion(getTargetVersion());
 
+         // Move decommissioned datanode data to new table
+    executeInTransaction(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          processDecommissionedDatanodes();
+        } catch (Exception e) {
+          LOG.warn("Updating decommissioned datanodes to new format threw " +
+            "exception. ", e);
+        }
+      }
+    });
+  }
+
+  protected void processDecommissionedDatanodes() {
+    KeyValueDAO keyValueDAO = injector.getInstance(KeyValueDAO.class);
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    Gson gson = injector.getInstance(Gson.class);
+    HostComponentDesiredStateDAO desiredStateDAO = injector.getInstance
+      (HostComponentDesiredStateDAO.class);
+
+    KeyValueEntity keyValueEntity = keyValueDAO.findByKey("decommissionDataNodesTag");
+    String value = null;
+    if (keyValueEntity != null) {
+      value = keyValueEntity.getValue();
+      if (value != null && !value.isEmpty()) {
+        List<ClusterEntity> clusterEntities = clusterDAO.findAll();
+        for (ClusterEntity clusterEntity : clusterEntities) {
+          Long clusterId = clusterEntity.getClusterId();
+          ClusterConfigEntityPK configEntityPK = new ClusterConfigEntityPK();
+          configEntityPK.setClusterId(clusterId);
+          configEntityPK.setType("hdfs-exclude-file");
+          configEntityPK.setTag(value.trim());
+          ClusterConfigEntity configEntity = clusterDAO.findConfig(configEntityPK);
+          if (configEntity != null) {
+            String configData = configEntity.getData();
+            if (configData != null) {
+              Map<String, String> properties = gson.<Map<String, String>>fromJson(configData, Map.class);
+              if (properties != null && !properties.isEmpty()) {
+                String decommissionedNodes = properties.get("datanodes");
+                if (decommissionedNodes != null) {
+                  String[] nodes = decommissionedNodes.split(",");
+                  if (nodes.length > 0) {
+                    for (String node : nodes) {
+                      HostComponentDesiredStateEntityPK entityPK =
+                        new HostComponentDesiredStateEntityPK();
+                      entityPK.setClusterId(clusterId);
+                      entityPK.setServiceName("HDFS");
+                      entityPK.setComponentName("DATANODE");
+                      entityPK.setHostName(node.trim());
+                      HostComponentDesiredStateEntity desiredStateEntity =
+                        desiredStateDAO.findByPK(entityPK);
+                      desiredStateEntity.setAdminState(HostComponentAdminState.DECOMMISSIONED);
+                      desiredStateDAO.merge(desiredStateEntity);
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+      // Rename saved key value entity so that the move is finalized
+      KeyValueEntity newEntity = new KeyValueEntity();
+      newEntity.setKey("decommissionDataNodesTag-Moved");
+      newEntity.setValue(value);
+      keyValueDAO.create(newEntity);
+      keyValueDAO.remove(keyValueEntity);
+    }
   }
 
   private String getPostgresServiceConfigMappingQuery() {

+ 162 - 0
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog150Test.java

@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.upgrade;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.Transactional;
+import junit.framework.Assert;
+import org.apache.ambari.server.orm.DBAccessorImpl;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
+import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
+import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.KeyValueDAO;
+import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
+import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
+import org.apache.ambari.server.orm.entities.HostComponentDesiredStateEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.KeyValueEntity;
+import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
+import org.apache.ambari.server.state.HostComponentAdminState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+import java.util.Collections;
+
+public class UpgradeCatalog150Test {
+  private Injector injector;
+  private final String CLUSTER_NAME = "c1";
+  private final String SERVICE_NAME = "HDFS";
+  private final String HOST_NAME = "h1";
+
+  @Before
+  public void setup() throws Exception {
+    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector.getInstance(GuiceJpaInitializer.class);
+  }
+
+  @After
+  public void tearDown() {
+    injector.getInstance(PersistService.class).stop();
+  }
+
+  private ClusterEntity createCluster() {
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    ClusterEntity clusterEntity = new ClusterEntity();
+    clusterEntity.setClusterId(1L);
+    clusterEntity.setClusterName(CLUSTER_NAME);
+    clusterDAO.create(clusterEntity);
+    return clusterEntity;
+  }
+
+  private ClusterServiceEntity createService(ClusterEntity clusterEntity) {
+    ClusterServiceDAO clusterServiceDAO = injector.getInstance(ClusterServiceDAO.class);
+    ClusterServiceEntity clusterServiceEntity = new ClusterServiceEntity();
+    clusterServiceEntity.setClusterId(1L);
+    clusterServiceEntity.setClusterEntity(clusterEntity);
+    clusterServiceEntity.setServiceName(SERVICE_NAME);
+    clusterServiceDAO.create(clusterServiceEntity);
+    return clusterServiceEntity;
+  }
+
+  private HostEntity createHost(ClusterEntity clusterEntity) {
+    HostDAO hostDAO = injector.getInstance(HostDAO.class);
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    HostEntity hostEntity = new HostEntity();
+    hostEntity.setHostName(HOST_NAME);
+    hostEntity.setClusterEntities(Collections.singletonList(clusterEntity));
+    hostDAO.create(hostEntity);
+    clusterEntity.getHostEntities().add(hostEntity);
+    clusterDAO.merge(clusterEntity);
+    return hostEntity;
+  }
+
+  @Test
+  public void testProcessDecommissionedDatanodes() throws Exception {
+    ClusterEntity clusterEntity = createCluster();
+    ClusterServiceEntity clusterServiceEntity = createService(clusterEntity);
+    HostEntity hostEntity = createHost(clusterEntity);
+
+    ServiceComponentDesiredStateEntity componentDesiredStateEntity =
+      new ServiceComponentDesiredStateEntity();
+    componentDesiredStateEntity.setClusterId(clusterEntity.getClusterId());
+    componentDesiredStateEntity.setServiceName(clusterServiceEntity.getServiceName());
+    componentDesiredStateEntity.setClusterServiceEntity(clusterServiceEntity);
+    componentDesiredStateEntity.setComponentName("DATANODE");
+
+    //componentDesiredStateDAO.create(componentDesiredStateEntity);
+
+    HostComponentDesiredStateDAO hostComponentDesiredStateDAO =
+      injector.getInstance(HostComponentDesiredStateDAO.class);
+
+    HostComponentDesiredStateEntity hostComponentDesiredStateEntity =
+      new HostComponentDesiredStateEntity();
+
+    hostComponentDesiredStateEntity.setClusterId(clusterEntity.getClusterId());
+    hostComponentDesiredStateEntity.setComponentName("DATANODE");
+    hostComponentDesiredStateEntity.setAdminState(HostComponentAdminState.INSERVICE);
+    hostComponentDesiredStateEntity.setServiceName(clusterServiceEntity.getServiceName());
+    hostComponentDesiredStateEntity.setServiceComponentDesiredStateEntity(componentDesiredStateEntity);
+    hostComponentDesiredStateEntity.setHostEntity(hostEntity);
+    hostComponentDesiredStateEntity.setHostName(hostEntity.getHostName());
+
+    hostComponentDesiredStateDAO.create(hostComponentDesiredStateEntity);
+
+    HostComponentDesiredStateEntity entity = hostComponentDesiredStateDAO.findAll().get(0);
+
+    Assert.assertEquals(HostComponentAdminState.INSERVICE.name(), entity.getAdminState().name());
+
+    KeyValueDAO keyValueDAO = injector.getInstance(KeyValueDAO.class);
+    KeyValueEntity keyValueEntity = new KeyValueEntity();
+    keyValueEntity.setKey("decommissionDataNodesTag");
+    keyValueEntity.setValue("1394147791230");
+    keyValueDAO.create(keyValueEntity);
+
+    ClusterDAO clusterDAO = injector.getInstance(ClusterDAO.class);
+    ClusterConfigEntity configEntity = new ClusterConfigEntity();
+    configEntity.setClusterEntity(clusterEntity);
+    configEntity.setClusterId(clusterEntity.getClusterId());
+    configEntity.setType("hdfs-exclude-file");
+    configEntity.setTag("1394147791230");
+    configEntity.setData("{\"datanodes\":\"" + HOST_NAME + "\"}");
+    configEntity.setTimestamp(System.currentTimeMillis());
+    clusterDAO.createConfig(configEntity);
+
+    UpgradeCatalog150 upgradeCatalog150 = injector.getInstance(UpgradeCatalog150.class);
+
+    upgradeCatalog150.processDecommissionedDatanodes();
+
+    entity = hostComponentDesiredStateDAO.findAll().get(0);
+
+    Assert.assertEquals(HostComponentAdminState.DECOMMISSIONED.name(), entity.getAdminState().name());
+
+    keyValueEntity = keyValueDAO.findByKey("decommissionDataNodesTag");
+    Assert.assertNull(keyValueEntity);
+    keyValueEntity = keyValueDAO.findByKey("decommissionDataNodesTag-Moved");
+    Assert.assertNotNull(keyValueEntity);
+    Assert.assertEquals("1394147791230", keyValueEntity.getValue());
+  }
+}