瀏覽代碼

AMBARI-6827. Missed "version" column in "clusterconfig" table, and UQ_config_type_tag, UQ_config_type_version constraints. (mpapirkovskyy)

Myroslav Papirkovskyy 10 年之前
父節點
當前提交
2ad28c6322

+ 6 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java

@@ -18,6 +18,7 @@
 package org.apache.ambari.server.orm;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
@@ -30,6 +31,11 @@ import org.eclipse.persistence.sessions.DatabaseSession;
  */
 public interface DBAccessor {
 
+  /**
+   * @return new database connection
+   */
+  Connection getNewConnection();
+
   /**
    * Wraps object name with dbms-specific quotes
    * @param name object name without quotes

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java

@@ -117,6 +117,17 @@ public class DBAccessorImpl implements DBAccessor {
     return connection;
   }
 
+  @Override
+  public Connection getNewConnection() {
+    try {
+      return DriverManager.getConnection(configuration.getDatabaseUrl(),
+        configuration.getDatabaseUser(),
+        configuration.getDatabasePassword());
+    } catch (SQLException e) {
+      throw new RuntimeException("Unable to connect to database", e);
+    }
+  }
+
   @Override
   public String quoteObjectName(String name) {
     return dbmsHelper.quoteObjectName(name);

+ 111 - 0
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java

@@ -18,10 +18,14 @@
 
 package org.apache.ambari.server.upgrade;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -267,6 +271,17 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
 
     dbAccessor.executeQuery("ALTER TABLE clusterconfig ADD PRIMARY KEY (config_id)");
 
+    //fill version column
+    dbAccessor.addColumn("clusterconfig", new DBColumnInfo("version", Long.class, null));
+
+    populateConfigVersions();
+
+    dbAccessor.setNullable("clusterconfig", "version", false);
+
+    dbAccessor.executeQuery("ALTER TABLE clusterconfig ADD CONSTRAINT UQ_config_type_tag UNIQUE (cluster_id, type_name, version_tag)", true);
+    dbAccessor.executeQuery("ALTER TABLE clusterconfig ADD CONSTRAINT UQ_config_type_version UNIQUE (cluster_id, type_name, version)", true);
+
+
     columns.clear();
     columns.add(new DBColumnInfo("service_config_id", Long.class, null, null, false));
     columns.add(new DBColumnInfo("cluster_id", Long.class, null, null, false));
@@ -277,6 +292,8 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
     columns.add(new DBColumnInfo("note", char[].class, null, null, true));
     dbAccessor.createTable("serviceconfig", columns, "service_config_id");
 
+    dbAccessor.executeQuery("ALTER TABLE serviceconfig ADD CONSTRAINT UQ_scv_service_version UNIQUE (cluster_id, service_name, version)", true);
+
     columns.clear();
     columns.add(new DBColumnInfo("service_config_id", Long.class, null, null, false));
     columns.add(new DBColumnInfo("config_id", Long.class, null, null, false));
@@ -286,6 +303,100 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
       new String[]{"cluster_id", "config_type", "version_tag"}, "clusterconfig",
       new String[]{"cluster_id", "type_name", "version_tag"}, true);
 
+
+
+    //service config version sequences
+    String valueColumnName = "\"value\"";
+    if (Configuration.ORACLE_DB_NAME.equals(dbType)
+      || Configuration.MYSQL_DB_NAME.equals(dbType)) {
+      valueColumnName = "value";
+    }
+
+    dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, "
+      + valueColumnName + ") " + "VALUES('service_config_id_seq', 1)", false);
+
+    dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, "
+      + valueColumnName + ") " + "VALUES('service_config_application_id_seq', 1)", false);
+
+    long count = 1;
+    ResultSet resultSet = null;
+    try {
+      resultSet = dbAccessor.executeSelect("SELECT count(*) FROM clusterconfig");
+      if (resultSet.next()) {
+        count = resultSet.getLong(1) + 2;
+      }
+    } finally {
+      if (resultSet != null) {
+        resultSet.close();
+      }
+    }
+
+    dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, "
+      + valueColumnName + ") " + "VALUES('config_id_seq', " + count + ")", false);
+
+  }
+
+  private void populateConfigVersions() throws SQLException {
+    ResultSet resultSet = dbAccessor.executeSelect("SELECT DISTINCT type_name FROM clusterconfig ");
+    Set<String> configTypes = new HashSet<String>();
+    if (resultSet != null) {
+      try {
+        while (resultSet.next()) {
+          configTypes.add(resultSet.getString("type_name"));
+        }
+      } finally {
+        resultSet.close();
+      }
+    }
+
+    //use new connection to not affect state of internal one
+    Connection connection = dbAccessor.getNewConnection();
+    PreparedStatement orderedConfigsStatement =
+      connection.prepareStatement("SELECT config_id FROM clusterconfig WHERE type_name = ? ORDER BY create_timestamp");
+
+    Map<String, List<Long>> configVersionMap = new HashMap<String, List<Long>>();
+    for (String configType : configTypes) {
+      List<Long> configIds = new ArrayList<Long>();
+      orderedConfigsStatement.setString(1, configType);
+      resultSet = orderedConfigsStatement.executeQuery();
+      if (resultSet != null) {
+        try {
+          while (resultSet.next()) {
+            configIds.add(resultSet.getLong("config_id"));
+          }
+        } finally {
+          resultSet.close();
+        }
+      }
+      configVersionMap.put(configType, configIds);
+    }
+
+    orderedConfigsStatement.close();
+
+    connection.setAutoCommit(false); //disable autocommit
+    PreparedStatement configVersionStatement =
+      connection.prepareStatement("UPDATE clusterconfig SET version = ? WHERE config_id = ?");
+
+
+    try {
+      for (Entry<String, List<Long>> entry : configVersionMap.entrySet()) {
+        long version = 1L;
+        for (Long configId : entry.getValue()) {
+          configVersionStatement.setLong(1, version++);
+          configVersionStatement.setLong(2, configId);
+          configVersionStatement.addBatch();
+        }
+        configVersionStatement.executeBatch();
+      }
+      connection.commit(); //commit changes manually
+    } catch (SQLException e) {
+      connection.rollback();
+      throw e;
+    } finally {
+      configVersionStatement.close();
+      connection.close();
+    }
+
   }