Browse Source

AMBARI-12121 Fix risky thread-shared field evades lock acquisition (dsen)

Dmytro Sen 10 years ago
parent
commit
d68783254a
16 changed files with 373 additions and 215 deletions
  1. 3 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
  2. 8 4
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/LdapSyncEventResourceProvider.java
  3. 5 18
      ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
  4. 37 21
      ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
  5. 9 11
      ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
  6. 26 14
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
  7. 29 15
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
  8. 23 11
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog150.java
  9. 17 6
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog161.java
  10. 59 31
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java
  11. 91 48
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java
  12. 0 17
      ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java
  13. 12 4
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog161Test.java
  14. 26 10
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog170Test.java
  15. 15 2
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java
  16. 13 2
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog210Test.java

+ 3 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java

@@ -654,7 +654,9 @@ public abstract class AbstractProviderModule implements ProviderModule,
   private void resetInit() {
     if (initialized) {
       synchronized (this) {
-        initialized = false;
+        if (initialized) {
+          initialized = false;
+        }
       }
     }
   }

+ 8 - 4
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/LdapSyncEventResourceProvider.java

@@ -401,10 +401,14 @@ public class LdapSyncEventResourceProvider extends AbstractControllerResourcePro
     while (processingEvents) {
       LdapSyncEventEntity event;
       synchronized (eventQueue) {
-        event = eventQueue.poll();
-        if (event == null) {
-          processingEvents = false;
-          return;
+        if (processingEvents) {
+          event = eventQueue.poll();
+          if (event == null) {
+            processingEvents = false;
+            return;
+          }
+        } else {
+          break;
         }
       }
 

+ 5 - 18
ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java

@@ -32,6 +32,11 @@ import org.eclipse.persistence.sessions.DatabaseSession;
  */
 public interface DBAccessor {
 
+  /**
+   * @return database connection
+   */
+  Connection getConnection();
+
   /**
    * @return new database connection
    */
@@ -271,24 +276,6 @@ public interface DBAccessor {
    */
   void executeQuery(String query) throws SQLException;
 
-  /**
-   * Execute select query
-   * @param query
-   * @return
-   * @throws SQLException
-   */
-  ResultSet executeSelect(String query) throws SQLException;
-
-  /**
-   * Execute select query
-   * @param query
-   * @param resultSetType
-   * @param resultSetConcur
-   * @return
-   * @throws SQLException
-   */
-  ResultSet executeSelect(String query, int resultSetType, int resultSetConcur) throws SQLException;
-
   /**
    * Execute query on DB
    * @param query

+ 37 - 21
ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java

@@ -69,7 +69,6 @@ public class DBAccessorImpl implements DBAccessor {
   private Configuration configuration;
   private DatabaseMetaData databaseMetaData;
   private static final String dbURLPatternString = "jdbc:(.*?):.*";
-  private Pattern dbURLPattern = Pattern.compile(dbURLPatternString, Pattern.CASE_INSENSITIVE);
   private DbType dbType;
   
   @Inject
@@ -122,7 +121,8 @@ public class DBAccessorImpl implements DBAccessor {
     }
   }
 
-  protected Connection getConnection() {
+  @Override
+  public Connection getConnection() {
     return connection;
   }
 
@@ -414,13 +414,25 @@ public class DBAccessorImpl implements DBAccessor {
   public boolean tableHasConstraint(String tableName, String constraintName) throws SQLException {
     // this kind of request is well lower level as we querying system tables, due that we need for some the name of catalog.
     String query = dbmsHelper.getTableConstraintsStatement(connection.getCatalog(), tableName);
-    ResultSet rs = executeSelect(query);
-    if (rs != null) {
-      while (rs.next()) {
-        if (rs.getString("CONSTRAINT_NAME").equalsIgnoreCase(constraintName)) {
-          return true;
+    Statement statement = null;
+    ResultSet rs = null;
+    try {
+      statement = getConnection().createStatement();
+      rs = statement.executeQuery(query);
+      if (rs != null) {
+        while (rs.next()) {
+          if (rs.getString("CONSTRAINT_NAME").equalsIgnoreCase(constraintName)) {
+            return true;
+          }
         }
       }
+    } finally {
+      if (statement != null) {
+        statement.close();
+      }
+      if (rs != null) {
+        rs.close();
+      }
     }
     return false;
   }
@@ -643,6 +655,10 @@ public class DBAccessorImpl implements DBAccessor {
       if (!ignoreErrors) {
         throw e;
       }
+    } finally {
+      if (statement != null) {
+        statement.close();
+      }
     }
     return 0;  // If error appears and ignoreError is set, return 0 (no changes was made)
   }
@@ -659,18 +675,6 @@ public class DBAccessorImpl implements DBAccessor {
     executeQuery(query, false);
   }
 
-  @Override
-  public ResultSet executeSelect(String query) throws SQLException {
-    Statement statement = getConnection().createStatement();
-    return statement.executeQuery(query);
-  }
-
-  @Override
-  public ResultSet executeSelect(String query, int resultSetType, int resultSetConcur) throws SQLException {
-    Statement statement = getConnection().createStatement(resultSetType, resultSetConcur);
-    return statement.executeQuery(query);
-  }  
-  
   @Override
   public void executeQuery(String query, boolean ignoreFailure) throws SQLException {
     LOG.info("Executing query: {}", query);
@@ -860,8 +864,20 @@ public class DBAccessorImpl implements DBAccessor {
   private ResultSetMetaData getColumnMetadata(String tableName, String columnName) throws SQLException {
     // We doesn't require any actual result except metadata, so WHERE clause shouldn't match
     String query = String.format("SELECT %s FROM %s WHERE 1=2", convertObjectName(columnName), convertObjectName(tableName));
-    ResultSet rs = executeSelect(query);
-    return rs.getMetaData();
+    Statement statement = null;
+    ResultSet rs = null;
+    try {
+      statement = getConnection().createStatement();
+      rs = statement.executeQuery(query);
+      return rs.getMetaData();
+    } finally {
+      if (statement != null) {
+        statement.close();
+      }
+      if (rs != null) {
+        rs.close();
+      }
+    }
   }
 
   @Override

+ 9 - 11
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java

@@ -158,18 +158,16 @@ public class ClustersImpl implements Clusters {
   }
 
   private void checkLoaded() {
-    if (clustersLoaded) {
-      return;
-    }
-
-    w.lock();
-    try {
-      if (!clustersLoaded) {
-        loadClustersAndHosts();
+    if (!clustersLoaded) {
+      w.lock();
+      try {
+        if (!clustersLoaded) {
+          loadClustersAndHosts();
+        }
+        clustersLoaded = true;
+      } finally {
+        w.unlock();
       }
-      clustersLoaded = true;
-    } finally {
-      w.unlock();
     }
   }
 

+ 26 - 14
ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java

@@ -42,8 +42,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.persistence.EntityManager;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -124,20 +126,30 @@ public abstract class AbstractUpgradeCatalog implements UpgradeCatalog {
    * @throws SQLException
    */
    protected final void addSequence(String seqName, Long seqDefaultValue, boolean ignoreFailure) throws SQLException{
-    // check if sequence is already in the database
-    ResultSet rs = dbAccessor.executeSelect(String.format("SELECT COUNT(*) from %s where sequence_name='%s'", ambariSequencesTable, seqName));
-
-    if (rs != null) {
-      try {
-        if (rs.next() && rs.getInt(1) == 0) {
-          dbAccessor.executeQuery(String.format("INSERT INTO %s(sequence_name, sequence_value) VALUES('%s', %d)", ambariSequencesTable, seqName, seqDefaultValue), ignoreFailure);
-        } else {
-          LOG.warn("Sequence {} already exists, skipping", seqName);
-        }
-      } finally {
-        rs.close();
-      }
-    }
+     // check if sequence is already in the database
+     Statement statement = null;
+     ResultSet rs = null;
+     try {
+       statement = dbAccessor.getConnection().createStatement();
+       if (statement != null) {
+         rs = statement.executeQuery(String.format("SELECT COUNT(*) from %s where sequence_name='%s'", ambariSequencesTable, seqName));
+
+         if (rs != null) {
+           if (rs.next() && rs.getInt(1) == 0) {
+             dbAccessor.executeQuery(String.format("INSERT INTO %s(sequence_name, sequence_value) VALUES('%s', %d)", ambariSequencesTable, seqName, seqDefaultValue), ignoreFailure);
+           } else {
+             LOG.warn("Sequence {} already exists, skipping", seqName);
+           }
+         }
+       }
+     } finally {
+       if (rs != null) {
+         rs.close();
+       }
+       if (statement != null) {
+         statement.close();
+       }
+     }
   }
 
   /**

+ 29 - 15
ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java

@@ -30,8 +30,10 @@ import org.apache.ambari.server.utils.VersionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -72,29 +74,41 @@ public class SchemaUpgradeHelper {
 
   public String readSourceVersion() {
 
-    ResultSet resultSet = null;
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      resultSet = dbAccessor.executeSelect("SELECT " + dbAccessor.quoteObjectName("metainfo_value") +
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT " + dbAccessor.quoteObjectName("metainfo_value") +
           " from metainfo WHERE " + dbAccessor.quoteObjectName("metainfo_key") + "='version'");
-      if (resultSet.next()) {
-        return resultSet.getString(1);
-      } else {
-        //not found, assume oldest version
-        //doesn't matter as there single upgrade catalog for 1.2.0 - 1.5.0 and 1.4.4 - 1.5.0 upgrades
-        return "1.2.0";
+        if (rs != null && rs.next()) {
+          return rs.getString(1);
+        }
       }
     } catch (SQLException e) {
       throw new RuntimeException("Unable to read database version", e);
-    }finally {
-      if (resultSet != null) {
-        try {
-          resultSet.close();
-        } catch (SQLException e) {
-          throw new RuntimeException("Cannot close result set");
+
+    } finally {
+      {
+        if (rs != null) {
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            throw new RuntimeException("Cannot close result set");
+          }
+        }
+        if (statement != null) {
+          try {
+            statement.close();
+          } catch (SQLException e) {
+            throw new RuntimeException("Cannot close statement");
+          }
         }
       }
     }
-
+    //not found, assume oldest version
+    //doesn't matter as there single upgrade catalog for 1.2.0 - 1.5.0 and 1.4.4 - 1.5.0 upgrades
+    return "1.2.0";
   }
 
   /**

+ 23 - 11
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog150.java

@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -65,6 +66,7 @@ import org.apache.ambari.server.orm.entities.KeyValueEntity;
 import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
 import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK;
 import org.apache.ambari.server.orm.entities.StackEntity;
+import org.apache.ambari.server.orm.entities.UserEntity;
 import org.apache.ambari.server.state.HostComponentAdminState;
 import org.apache.ambari.server.state.PropertyInfo;
 import org.apache.ambari.server.state.ServiceInfo;
@@ -454,20 +456,30 @@ public class UpgradeCatalog150 extends AbstractUpgradeCatalog {
     // Sequences
     if (dbAccessor.tableExists("ambari_sequences")) {
       if (databaseType == DatabaseType.POSTGRES) {
-
-        ResultSet resultSet = dbAccessor.executeSelect("select * from ambari_sequences where sequence_name in " +
-            "('cluster_id_seq','user_id_seq','host_role_command_id_seq')");
-
+        Statement statement = null;
+        ResultSet rs = null;
         try {
-          if (!resultSet.next()) {
-            dbAccessor.executeQuery(getPostgresSequenceUpgradeQuery(), true);
-            // Deletes
-            dbAccessor.dropSequence("host_role_command_task_id_seq");
-            dbAccessor.dropSequence("users_user_id_seq");
-            dbAccessor.dropSequence("clusters_cluster_id_seq");
+          statement = dbAccessor.getConnection().createStatement();
+          if (statement != null) {
+            rs = statement.executeQuery("select * from ambari_sequences where sequence_name in " +
+              "('cluster_id_seq','user_id_seq','host_role_command_id_seq')");
+            if (rs != null) {
+              if (!rs.next()) {
+                dbAccessor.executeQuery(getPostgresSequenceUpgradeQuery(), true);
+                // Deletes
+                dbAccessor.dropSequence("host_role_command_task_id_seq");
+                dbAccessor.dropSequence("users_user_id_seq");
+                dbAccessor.dropSequence("clusters_cluster_id_seq");
+              }
+            }
           }
         } finally {
-          resultSet.close();
+          if (rs != null) {
+            rs.close();
+          }
+          if (statement != null) {
+            statement.close();
+          }
         }
       }
     }

+ 17 - 6
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog161.java

@@ -18,8 +18,10 @@
 
 package org.apache.ambari.server.upgrade;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -215,15 +217,24 @@ public class UpgradeCatalog161 extends AbstractUpgradeCatalog {
 
 
     long count = 1;
-    ResultSet resultSet = null;
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      resultSet = dbAccessor.executeSelect("SELECT count(*) FROM viewinstance");
-      if (resultSet.next()) {
-        count = resultSet.getLong(1) + 2;
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT count(*) FROM viewinstance");
+        if (rs != null) {
+          if (rs.next()) {
+            count = rs.getLong(1) + 2;
+          }
+        }
       }
     } finally {
-      if (resultSet != null) {
-        resultSet.close();
+      if (rs != null) {
+        rs.close();
+      }
+      if (statement != null) {
+        statement.close();
       }
     }
 

+ 59 - 31
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java

@@ -94,6 +94,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -461,15 +462,25 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
     dbAccessor.executeQuery("INSERT INTO ambari_sequences(sequence_name, sequence_value) VALUES('service_config_application_id_seq', 1)", false);
 
     long count = 1;
-    ResultSet resultSet = null;
+
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      resultSet = dbAccessor.executeSelect("SELECT count(*) FROM clusterconfig");
-      if (resultSet.next()) {
-        count = resultSet.getLong(1) + 2;
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT count(*) FROM clusterconfig");
+        if (rs != null) {
+          if (rs.next()) {
+            count = rs.getLong(1) + 2;
+          }
+        }
       }
     } finally {
-      if (resultSet != null) {
-        resultSet.close();
+      if (rs != null) {
+        rs.close();
+      }
+      if (statement != null) {
+        statement.close();
       }
     }
 
@@ -530,17 +541,8 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
   }
 
   private void populateConfigVersions() throws SQLException {
-    ResultSet resultSet = dbAccessor.executeSelect("SELECT DISTINCT type_name FROM clusterconfig ");
+    ResultSet resultSet = null;
     Set<String> configTypes = new HashSet<String>();
-    if (resultSet != null) {
-      try {
-        while (resultSet.next()) {
-          configTypes.add(resultSet.getString("type_name"));
-        }
-      } finally {
-        resultSet.close();
-      }
-    }
 
     //use new connection to not affect state of internal one
     Connection connection = null;
@@ -548,6 +550,24 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
     Map<String, List<Long>> configVersionMap = new HashMap<String, List<Long>>();
     try {
       connection = dbAccessor.getNewConnection();
+
+      Statement statement = null;
+      try {
+        statement = connection.createStatement();
+        if (statement != null) {
+          resultSet = statement.executeQuery("SELECT DISTINCT type_name FROM clusterconfig ");
+          if (resultSet != null) {
+            while (resultSet.next()) {
+              configTypes.add(resultSet.getString("type_name"));
+            }
+          }
+        }
+      } finally {
+        if (statement != null) {
+          statement.close();
+        }
+      }
+
       try {
         orderedConfigsStatement
                 = connection.prepareStatement("SELECT config_id FROM clusterconfig WHERE type_name = ? ORDER BY create_timestamp");
@@ -1360,27 +1380,35 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog {
     final ResourceEntity ambariResource = resourceDAO.findAmbariResource();
 
     final Map<UserEntity, List<String>> roles = new HashMap<UserEntity, List<String>>();
-    ResultSet resultSet = null;
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      resultSet = dbAccessor.executeSelect("SELECT role_name, user_id FROM user_roles");
-      while (resultSet.next()) {
-        final String roleName = resultSet.getString(1);
-        final int userId = resultSet.getInt(2);
-
-        final UserEntity user = userDAO.findByPK(userId);
-        List<String> userRoles = roles.get(user);
-        if (userRoles == null) {
-          userRoles = new ArrayList<String>();
-          roles.put(user, userRoles);
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT role_name, user_id FROM user_roles");
+        if (rs != null) {
+          while (rs.next()) {
+            final String roleName = rs.getString(1);
+            final int userId = rs.getInt(2);
+
+            final UserEntity user = userDAO.findByPK(userId);
+            List<String> userRoles = roles.get(user);
+            if (userRoles == null) {
+              userRoles = new ArrayList<String>();
+              roles.put(user, userRoles);
+            }
+            userRoles.add(roleName);
+          }
         }
-        userRoles.add(roleName);
       }
     } finally {
-      if (resultSet != null) {
-        resultSet.close();
+      if (rs != null) {
+        rs.close();
+      }
+      if (statement != null) {
+        statement.close();
       }
     }
-
     for (UserEntity user: userDAO.findAll()) {
       List<String> userRoles = roles.get(user);
       if (userRoles.contains("admin")) {

+ 91 - 48
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog210.java

@@ -61,6 +61,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -317,17 +318,24 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
 
     // Sequence value for the hosts table primary key. First record will be 1, so ambari_sequence value must be 0.
     Long hostId = 0L;
-    ResultSet resultSet = null;
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      // Notice that hosts are ordered by host_id ASC, so any null values are last.
-      resultSet = dbAccessor.executeSelect("SELECT host_name, host_id FROM hosts ORDER BY host_id ASC, host_name ASC");
-      hostId = populateHostsId(resultSet);
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT host_name, host_id FROM hosts ORDER BY host_id ASC, host_name ASC");
+        if (rs != null) {
+          hostId = populateHostsId(rs);
+        }
+      }
     } finally {
-      if (resultSet != null) {
-        resultSet.close();
+      if (rs != null) {
+        rs.close();
+      }
+      if (statement != null) {
+        statement.close();
       }
     }
-
     // Insert host id number into ambari_sequences
     addSequence("host_id_seq", hostId, false);
 
@@ -728,30 +736,41 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
     String INSERT_STACK_ID_TEMPLATE = "UPDATE {0} SET {1} = {2} WHERE cluster_id = {3}";
     // we should do the changes only if they are required
     if (dbAccessor.tableHasColumn(CLUSTERS_TABLE,DESIRED_STACK_VERSION_COLUMN_NAME)) {
-      ResultSet resultSet = dbAccessor.executeSelect("SELECT * FROM " + CLUSTERS_TABLE);
-      try {
-        while (resultSet.next()) {
-          long clusterId = resultSet.getLong("cluster_id");
-          String stackJson = resultSet.getString(DESIRED_STACK_VERSION_COLUMN_NAME);
-          StackId stackId = gson.fromJson(stackJson, StackId.class);
 
-          StackEntity stackEntity = stackDAO.find(stackId.getStackName(),
-                                                   stackId.getStackVersion());
-
-          String clusterConfigSQL = MessageFormat.format(
-                                                          INSERT_STACK_ID_TEMPLATE, "clusterconfig", STACK_ID_COLUMN_NAME,
-                                                          stackEntity.getStackId(), clusterId);
-
-          String serviceConfigSQL = MessageFormat.format(
-                                                          INSERT_STACK_ID_TEMPLATE, "serviceconfig", STACK_ID_COLUMN_NAME,
-                                                          stackEntity.getStackId(), clusterId);
-
-          dbAccessor.executeQuery(clusterConfigSQL);
-          dbAccessor.executeQuery(serviceConfigSQL);
+      Statement statement = null;
+      ResultSet rs = null;
+      try {
+        statement = dbAccessor.getConnection().createStatement();
+        if (statement != null) {
+          rs = statement.executeQuery("SELECT * FROM " + CLUSTERS_TABLE);
+          if (rs != null) {
+            while (rs.next()) {
+              long clusterId = rs.getLong("cluster_id");
+              String stackJson = rs.getString(DESIRED_STACK_VERSION_COLUMN_NAME);
+              StackId stackId = gson.fromJson(stackJson, StackId.class);
+
+              StackEntity stackEntity = stackDAO.find(stackId.getStackName(),
+                stackId.getStackVersion());
+
+              String clusterConfigSQL = MessageFormat.format(
+                INSERT_STACK_ID_TEMPLATE, "clusterconfig", STACK_ID_COLUMN_NAME,
+                stackEntity.getStackId(), clusterId);
+
+              String serviceConfigSQL = MessageFormat.format(
+                INSERT_STACK_ID_TEMPLATE, "serviceconfig", STACK_ID_COLUMN_NAME,
+                stackEntity.getStackId(), clusterId);
+
+              dbAccessor.executeQuery(clusterConfigSQL);
+              dbAccessor.executeQuery(serviceConfigSQL);
+            }
+          }
         }
       } finally {
-        if (null != resultSet) {
-          resultSet.close();
+        if (rs != null) {
+          rs.close();
+        }
+        if (statement != null) {
+          statement.close();
         }
       }
     }
@@ -814,17 +833,25 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
 
   private String getRandomHostName() throws SQLException {
     String randomHostName = null;
-    ResultSet resultSet = null;
+
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      resultSet = dbAccessor.executeSelect("SELECT " + HOST_NAME_COL + " FROM " + HOSTS_TABLE + " ORDER BY " + HOST_NAME_COL + " ASC");
-      if (resultSet != null && resultSet.next()) {
-        randomHostName = resultSet.getString(1);
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT " + HOST_NAME_COL + " FROM " + HOSTS_TABLE + " ORDER BY " + HOST_NAME_COL + " ASC");
+        if (rs != null && rs.next()) {
+          randomHostName = rs.getString(1);
+        }
       }
     } catch (Exception e) {
       LOG.error("Failed to retrieve random host name. Exception: " + e.getMessage());
     } finally {
-      if (resultSet != null) {
-        resultSet.close();
+      if (rs != null) {
+        rs.close();
+      }
+      if (statement != null) {
+        statement.close();
       }
     }
     return randomHostName;
@@ -840,8 +867,11 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
         "SELECT * FROM {0} WHERE {1} NOT IN (SELECT {1} FROM {2})",
         HOSTS_TABLE, HOST_ID_COL, CLUSTER_HOST_MAPPING_TABLE);
     ResultSet hostsNotInCluster = null;
+    Statement statement = null;
+
     try {
-      hostsNotInCluster = dbAccessor.executeSelect(hostsNotInClusterQuery);
+      statement = dbAccessor.getConnection().createStatement();
+      hostsNotInCluster = statement.executeQuery(hostsNotInClusterQuery);
       if(hostsNotInCluster != null) {
         while (hostsNotInCluster.next()) {
           long hostToDeleteId = hostsNotInCluster.getLong(HOST_ID_COL);
@@ -850,7 +880,8 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
           long count = 0;
           ResultSet duplicateHosts = null;
           try {
-            duplicateHosts = dbAccessor.executeSelect(duplicateHostsQuery);
+            statement = dbAccessor.getConnection().createStatement();
+            duplicateHosts = statement.executeQuery(duplicateHostsQuery);
             if (duplicateHosts != null && duplicateHosts.next()) {
               count = duplicateHosts.getLong(1);
             }
@@ -872,6 +903,9 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
       if (null != hostsNotInCluster) {
         hostsNotInCluster.close();
       }
+      if (statement != null) {
+        statement.close();
+      }
     }
   }
 
@@ -883,25 +917,34 @@ public class UpgradeCatalog210 extends AbstractUpgradeCatalog {
    * @throws SQLException
    */
   private String getDerbyTableConstraintName(String type, String tableName) throws SQLException {
-    ResultSet resultSet = null;
     boolean found = false;
     String constraint = null;
 
+    Statement statement = null;
+    ResultSet rs = null;
     try {
-      resultSet = dbAccessor.executeSelect("SELECT c.constraintname, c.type, t.tablename FROM sys.sysconstraints c, sys.systables t WHERE c.tableid = t.tableid");
-      while(resultSet.next()) {
-        constraint = resultSet.getString(1);
-        String recordType = resultSet.getString(2);
-        String recordTableName = resultSet.getString(3);
-
-        if (recordType.equalsIgnoreCase(type) && recordTableName.equalsIgnoreCase(tableName)) {
-          found = true;
-          break;
+      statement = dbAccessor.getConnection().createStatement();
+      if (statement != null) {
+        rs = statement.executeQuery("SELECT c.constraintname, c.type, t.tablename FROM sys.sysconstraints c, sys.systables t WHERE c.tableid = t.tableid");
+        if (rs != null) {
+          while(rs.next()) {
+            constraint = rs.getString(1);
+            String recordType = rs.getString(2);
+            String recordTableName = rs.getString(3);
+
+            if (recordType.equalsIgnoreCase(type) && recordTableName.equalsIgnoreCase(tableName)) {
+              found = true;
+              break;
+            }
+          }
         }
       }
     } finally {
-      if (resultSet != null) {
-        resultSet.close();
+      if (rs != null) {
+        rs.close();
+      }
+      if (statement != null) {
+        statement.close();
       }
     }
     return found ? constraint : null;

+ 0 - 17
ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java

@@ -396,23 +396,6 @@ public class DBAccessorImplTest {
 
   }
 
-  @Test
-  public void testExecuteSelect() throws Exception {
-    DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class);
-    String tableName = getFreeTableName();
-    createMyTable(tableName);
-    dbAccessor.executeQuery("insert into " + tableName + "(id, name, time) values(1, 'Bob', 1234567)");
-
-    ResultSet resultSet = dbAccessor.executeSelect("select name from " + tableName + " where id=1");
-    int count = 0;
-    while (resultSet.next()) {
-      assertEquals("Bob", resultSet.getString(1));
-      count++;
-    }
-
-    assertEquals(count, 1);
-  }
-
   @Test
   public void testDBSession() throws Exception {
     DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class);

+ 12 - 4
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog161Test.java

@@ -36,8 +36,10 @@ import static org.easymock.EasyMock.verify;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -70,6 +72,8 @@ public class UpgradeCatalog161Test {
   public void testExecuteDDLUpdates() throws Exception {
 
     final DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
+    Connection connection = createNiceMock(Connection.class);
+    Statement statement = createNiceMock(Statement.class);
     Configuration configuration = createNiceMock(Configuration.class);
     ResultSet resultSet = createNiceMock(ResultSet.class);
     expect(configuration.getDatabaseUrl()).andReturn(Configuration.JDBC_IN_MEMORY_URL).anyTimes();
@@ -88,16 +92,20 @@ public class UpgradeCatalog161Test {
     setOperationLevelEntityConfigExpectations(dbAccessor, operationLevelEntityColumnCapture);
     setViewExpectations(dbAccessor, viewIconColumnCapture, viewIcon64ColumnCapture);
     dbAccessor.addColumn(eq("viewinstance"),
-        anyObject(DBAccessor.DBColumnInfo.class));
+      anyObject(DBAccessor.DBColumnInfo.class));
     setViewInstanceExpectations(dbAccessor, labelColumnCapture, descriptionColumnCapture, visibleColumnCapture, instanceIconColumnCapture, instanceIcon64ColumnCapture);
-    dbAccessor.executeSelect(anyObject(String.class));
+    dbAccessor.getConnection();
+    expectLastCall().andReturn(connection).anyTimes();
+    connection.createStatement();
+    expectLastCall().andReturn(statement).anyTimes();
+    statement.executeQuery(anyObject(String.class));
     expectLastCall().andReturn(resultSet).anyTimes();
     resultSet.next();
     expectLastCall().andReturn(false).anyTimes();
     resultSet.close();
     expectLastCall().anyTimes();
 
-    replay(dbAccessor, configuration, resultSet);
+    replay(dbAccessor, configuration, resultSet, statement, connection);
     AbstractUpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
     Class<?> c = AbstractUpgradeCatalog.class;
     Field f = c.getDeclaredField("configuration");
@@ -105,7 +113,7 @@ public class UpgradeCatalog161Test {
     f.set(upgradeCatalog, configuration);
 
     upgradeCatalog.executeDDLUpdates();
-    verify(dbAccessor, configuration, resultSet);
+    verify(dbAccessor, configuration, resultSet, statement, connection);
 
     assertClusterColumns(provisioningStateColumnCapture);
     assertOperationLevelEntityColumns(operationLevelEntityColumnCapture);

+ 26 - 10
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog170Test.java

@@ -48,6 +48,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -169,6 +170,7 @@ public class UpgradeCatalog170Test {
   @Test
   public void testExecuteDDLUpdates_DBAccessor() throws Exception {
     final DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
+    Statement statement = createNiceMock(Statement.class);
     Connection connection = createNiceMock(Connection.class);
     PreparedStatement stmt = createNiceMock(PreparedStatement.class);
     Configuration configuration = createNiceMock(Configuration.class);
@@ -251,15 +253,18 @@ public class UpgradeCatalog170Test {
     dbAccessor.createTable(eq("serviceconfigmapping"),
         capture(serviceConfigMappingCapture), eq("service_config_id"),
         eq("config_id"));
-
-    dbAccessor.executeSelect(anyObject(String.class));
+    dbAccessor.getConnection();
+    expectLastCall().andReturn(connection).anyTimes();
+    connection.createStatement();
+    expectLastCall().andReturn(statement).anyTimes();
+    statement.executeQuery(anyObject(String.class));
     expectLastCall().andReturn(resultSet).anyTimes();
     resultSet.next();
     expectLastCall().andReturn(false).anyTimes();
     resultSet.close();
     expectLastCall().anyTimes();
 
-    replay(dbAccessor, configuration, resultSet, connection, stmt);
+    replay(dbAccessor, configuration, resultSet, connection, stmt, statement);
     AbstractUpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
     Class<?> c = AbstractUpgradeCatalog.class;
     Field f = c.getDeclaredField("configuration");
@@ -267,7 +272,7 @@ public class UpgradeCatalog170Test {
     f.set(upgradeCatalog, configuration);
 
     upgradeCatalog.executeDDLUpdates();
-    verify(dbAccessor, configuration, resultSet, connection, stmt);
+    verify(dbAccessor, configuration, resultSet, connection, stmt, statement);
 
     assertClusterConfigColumns(clusterConfigAttributesColumnCapture);
     assertHostgroupConfigColumns(hostgroupConfigAttributesColumnCapture);
@@ -300,6 +305,8 @@ public class UpgradeCatalog170Test {
   @Test
   public void testExecuteDMLUpdates() throws Exception {
     Configuration configuration = createNiceMock(Configuration.class);
+    Connection connection = createNiceMock(Connection.class);
+    Statement statement = createNiceMock(Statement.class);
     DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
     Injector injector = createNiceMock(Injector.class);
     ConfigHelper configHelper = createNiceMock(ConfigHelper.class);
@@ -386,7 +393,13 @@ public class UpgradeCatalog170Test {
     upgradeCatalog.addNewConfigurationsFromXml();
     expectLastCall();
 
-    expect(dbAccessor.executeSelect("SELECT role_name, user_id FROM user_roles")).andReturn(userRolesResultSet).once();
+    dbAccessor.getConnection();
+    expectLastCall().andReturn(connection).anyTimes();
+    connection.createStatement();
+    expectLastCall().andReturn(statement).anyTimes();
+    statement.executeQuery("SELECT role_name, user_id FROM user_roles");
+    expectLastCall().andReturn(userRolesResultSet).once();
+
     expect(entityManager.getTransaction()).andReturn(trans).anyTimes();
     expect(entityManager.getCriteriaBuilder()).andReturn(cb).anyTimes();
     expect(entityManager.createQuery(cq)).andReturn(q).anyTimes();
@@ -527,7 +540,7 @@ public class UpgradeCatalog170Test {
     keyValueDAO.remove(showJobsKeyValue);
     privilegeDAO.create(anyObject(PrivilegeEntity.class));
 
-    replay(entityManager, trans, upgradeCatalog, cb, cq, hrc, q, userRolesResultSet);
+    replay(entityManager, trans, upgradeCatalog, cb, cq, hrc, q, connection, statement, userRolesResultSet);
 
     replay(dbAccessor, configuration, injector, cluster, clusters, amc, config, configHelper, pigConfig);
     replay(userDAO, clusterDAO, viewDAO, viewInstanceDAO, permissionDAO, configGroupConfigMappingDAO);
@@ -550,10 +563,13 @@ public class UpgradeCatalog170Test {
 
     upgradeCatalog.executeDMLUpdates();
 
-    verify(upgradeCatalog, dbAccessor, configuration, injector, cluster, clusters, amc, config, configHelper,
-        jobsView, showJobsKeyValue, privilegeDAO, viewDAO, viewInstanceDAO, resourceDAO, keyValueDAO, userRolesResultSet,
-        userEntity1, userEntity2, userPrincipal1, userPrincipal2, userPrivileges1, userPrivileges2,
-        viewRegistry, clusterEntity, configEntity, configMappingEntity, clusterStateEntity);
+    verify(upgradeCatalog, dbAccessor, configuration, injector, cluster,
+      clusters, amc, config, configHelper,jobsView, showJobsKeyValue,
+      privilegeDAO, viewDAO, viewInstanceDAO, resourceDAO, keyValueDAO,
+      connection, statement, userRolesResultSet, userEntity1, userEntity2,
+      userPrincipal1, userPrincipal2, userPrivileges1, userPrivileges2,
+      viewRegistry, clusterEntity, configEntity, configMappingEntity,
+      clusterStateEntity);
   }
 
   @Test

+ 15 - 2
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog200Test.java

@@ -24,6 +24,7 @@ import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertNull;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
@@ -37,8 +38,10 @@ import static org.easymock.EasyMock.verify;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -129,6 +132,8 @@ public class UpgradeCatalog200Test {
   public void testExecuteDDLUpdates() throws Exception {
     final DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
     Configuration configuration = createNiceMock(Configuration.class);
+    Connection connection = createNiceMock(Connection.class);
+    Statement statement = createNiceMock(Statement.class);
     ResultSet resultSet = createNiceMock(ResultSet.class);
 
     expect(configuration.getDatabaseUrl()).andReturn(Configuration.JDBC_IN_MEMORY_URL).anyTimes();
@@ -250,7 +255,15 @@ public class UpgradeCatalog200Test {
     setViewInstancePropertyExpectations(dbAccessor, valueColumnCapture);
     setViewInstanceDataExpectations(dbAccessor, dataValueColumnCapture);
 
-    replay(dbAccessor, configuration, resultSet);
+    // AbstractUpgradeCatalog.addSequence()
+    dbAccessor.getConnection();
+    expectLastCall().andReturn(connection).anyTimes();
+    connection.createStatement();
+    expectLastCall().andReturn(statement).anyTimes();
+    statement.executeQuery(anyObject(String.class));
+    expectLastCall().andReturn(resultSet).anyTimes();
+
+    replay(dbAccessor, configuration, resultSet, statement, connection);
 
     AbstractUpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
     Class<?> c = AbstractUpgradeCatalog.class;
@@ -259,7 +272,7 @@ public class UpgradeCatalog200Test {
     f.set(upgradeCatalog, configuration);
 
     upgradeCatalog.executeDDLUpdates();
-    verify(dbAccessor, configuration, resultSet);
+    verify(dbAccessor, configuration, resultSet, statement, connection);
 
     // verify columns for alert_definition
     verifyAlertDefinitionIgnoreColumn(alertDefinitionIgnoreColumnCapture);

+ 13 - 2
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog210Test.java

@@ -19,6 +19,7 @@
 package org.apache.ambari.server.upgrade;
 
 import static junit.framework.Assert.assertEquals;
+import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
@@ -32,8 +33,10 @@ import static org.easymock.EasyMock.verify;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -119,8 +122,16 @@ public class UpgradeCatalog210Test {
   public void testExecuteDDLUpdates() throws Exception {
     final DBAccessor dbAccessor = createNiceMock(DBAccessor.class);
     Configuration configuration = createNiceMock(Configuration.class);
+    Connection connection = createNiceMock(Connection.class);
+    Statement statement = createNiceMock(Statement.class);
     ResultSet resultSet = createNiceMock(ResultSet.class);
     expect(configuration.getDatabaseUrl()).andReturn(Configuration.JDBC_IN_MEMORY_URL).anyTimes();
+    dbAccessor.getConnection();
+    expectLastCall().andReturn(connection).anyTimes();
+    connection.createStatement();
+    expectLastCall().andReturn(statement).anyTimes();
+    statement.executeQuery(anyObject(String.class));
+    expectLastCall().andReturn(resultSet).anyTimes();
 
     // Create DDL sections with their own capture groups
     AlertSectionDDL alertSectionDDL = new AlertSectionDDL();
@@ -135,7 +146,7 @@ public class UpgradeCatalog210Test {
     viewSectionDDL.execute(dbAccessor);
 
     // Replay sections
-    replay(dbAccessor, configuration, resultSet);
+    replay(dbAccessor, configuration, resultSet, connection, statement);
 
     AbstractUpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor);
     Class<?> c = AbstractUpgradeCatalog.class;
@@ -144,7 +155,7 @@ public class UpgradeCatalog210Test {
     f.set(upgradeCatalog, configuration);
 
     upgradeCatalog.executeDDLUpdates();
-    verify(dbAccessor, configuration, resultSet);
+    verify(dbAccessor, configuration, resultSet, connection, statement);
 
     // Verify sections
     alertSectionDDL.verify(dbAccessor);