|
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
|
@@ -72,6 +73,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.MonotonicClock;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -137,6 +140,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
private String url;
|
|
|
private int maximumPoolSize;
|
|
|
private HikariDataSource dataSource = null;
|
|
|
+ private final Clock clock = new MonotonicClock();
|
|
|
|
|
|
@Override
|
|
|
public void init(Configuration conf) throws YarnException {
|
|
@@ -203,7 +207,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check the ROWCOUNT value, if it is equal to 0 it means the call
|
|
|
// did not add a new subcluster into FederationStateStore
|
|
@@ -222,8 +228,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
LOG.info(
|
|
|
"Registered the SubCluster " + subClusterId + " into the StateStore");
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to register the SubCluster " + subClusterId
|
|
|
+ " into the StateStore",
|
|
@@ -260,7 +269,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check the ROWCOUNT value, if it is equal to 0 it means the call
|
|
|
// did not deregister the subcluster into FederationStateStore
|
|
@@ -278,8 +289,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
|
|
|
+ state.toString());
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to deregister the sub-cluster " + subClusterId + " state to "
|
|
|
+ state.toString(),
|
|
@@ -317,7 +331,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check the ROWCOUNT value, if it is equal to 0 it means the call
|
|
|
// did not update the subcluster into FederationStateStore
|
|
@@ -336,8 +352,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
LOG.info("Heartbeated the StateStore for the specified SubCluster "
|
|
|
+ subClusterId);
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to heartbeat the StateStore for the specified SubCluster "
|
|
|
+ subClusterId,
|
|
@@ -378,7 +397,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.execute();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
String amRMAddress = cstmt.getString(2);
|
|
|
String clientRMAddress = cstmt.getString(3);
|
|
@@ -403,6 +424,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
|
|
|
lastStartTime, capability);
|
|
|
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
// Check if the output it is a valid subcluster
|
|
|
try {
|
|
|
FederationMembershipStateStoreInputValidator
|
|
@@ -417,6 +441,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
+ subClusterInfo.toString());
|
|
|
}
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to obtain the SubCluster information for " + subClusterId, e);
|
|
|
} finally {
|
|
@@ -439,7 +464,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
rs = cstmt.executeQuery();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
while (rs.next()) {
|
|
|
|
|
@@ -459,6 +486,10 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
|
|
|
lastHeartBeat, state, lastStartTime, capability);
|
|
|
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
+
|
|
|
// Check if the output it is a valid subcluster
|
|
|
try {
|
|
|
FederationMembershipStateStoreInputValidator
|
|
@@ -477,6 +508,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
}
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to obtain the information for all the SubClusters ", e);
|
|
|
} finally {
|
|
@@ -513,11 +545,16 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
subClusterHome = cstmt.getString(3);
|
|
|
SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
|
|
|
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
// For failover reason, we check the returned SubClusterId.
|
|
|
// If it is equal to the subclusterId we sent, the call added the new
|
|
|
// application into FederationStateStore. If the call returns a different
|
|
@@ -554,6 +591,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
}
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils
|
|
|
.logAndThrowRetriableException(LOG,
|
|
|
"Unable to insert the newly generated application "
|
|
@@ -592,7 +630,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check the ROWCOUNT value, if it is equal to 0 it means the call
|
|
|
// did not update the application into FederationStateStore
|
|
@@ -611,8 +651,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
LOG.info(
|
|
|
"Update the SubCluster to {} for application {} in the StateStore",
|
|
|
subClusterId, appId);
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils
|
|
|
.logAndThrowRetriableException(LOG,
|
|
|
"Unable to update the application "
|
|
@@ -645,7 +688,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.execute();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
if (cstmt.getString(2) != null) {
|
|
|
homeRM = SubClusterId.newInstance(cstmt.getString(2));
|
|
@@ -659,7 +704,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
LOG.debug("Got the information about the specified application "
|
|
|
+ request.getApplicationId() + ". The AM is running in " + homeRM);
|
|
|
}
|
|
|
+
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to obtain the application information "
|
|
|
+ "for the specified application " + request.getApplicationId(),
|
|
@@ -688,7 +738,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
rs = cstmt.executeQuery();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
while (rs.next()) {
|
|
|
|
|
@@ -701,7 +753,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
SubClusterId.newInstance(homeSubCluster)));
|
|
|
}
|
|
|
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to obtain the information for all the applications ", e);
|
|
|
} finally {
|
|
@@ -731,7 +787,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check the ROWCOUNT value, if it is equal to 0 it means the call
|
|
|
// did not delete the application from FederationStateStore
|
|
@@ -750,8 +808,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
LOG.info("Delete from the StateStore the application: {}",
|
|
|
request.getApplicationId());
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to delete the application " + request.getApplicationId(), e);
|
|
|
} finally {
|
|
@@ -782,7 +843,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check if the output it is a valid policy
|
|
|
if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
|
|
@@ -798,7 +861,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to select the policy for the queue :" + request.getQueue(),
|
|
|
e);
|
|
@@ -833,7 +900,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
cstmt.executeUpdate();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
// Check the ROWCOUNT value, if it is equal to 0 it means the call
|
|
|
// did not add a new policy into FederationStateStore
|
|
@@ -852,8 +921,11 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
LOG.info("Insert into the state store the policy for the queue: "
|
|
|
+ policyConf.getQueue());
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to insert the newly generated policy for the queue :"
|
|
|
+ policyConf.getQueue(),
|
|
@@ -880,7 +952,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
|
|
|
|
|
|
// Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
rs = cstmt.executeQuery();
|
|
|
+ long stopTime = clock.getTime();
|
|
|
|
|
|
while (rs.next()) {
|
|
|
|
|
@@ -894,7 +968,12 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
ByteBuffer.wrap(policyInfo));
|
|
|
policyConfigurations.add(subClusterPolicyConfiguration);
|
|
|
}
|
|
|
+
|
|
|
+ FederationStateStoreClientMetrics
|
|
|
+ .succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
} catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
|
|
|
"Unable to obtain the policy information for all the queues.", e);
|
|
|
} finally {
|