|
@@ -33,10 +33,12 @@ import java.util.TimeZone;
|
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
|
|
@@ -86,11 +88,19 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyReq
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenRequest;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
|
|
-import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.sql.FederationSQLOutParameter;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.sql.RouterMasterKeyHandler;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.sql.RouterStoreTokenHandler;
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.sql.RowCountHandler;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.MonotonicClock;
|
|
@@ -100,6 +110,13 @@ import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import com.zaxxer.hikari.HikariDataSource;
|
|
|
|
|
|
+import static java.sql.Types.INTEGER;
|
|
|
+import static java.sql.Types.VARCHAR;
|
|
|
+import static java.sql.Types.BIGINT;
|
|
|
+import static org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner.YARN_ROUTER_CURRENT_KEY_ID;
|
|
|
+import static org.apache.hadoop.yarn.server.federation.store.sql.FederationQueryRunner.YARN_ROUTER_SEQUENCE_NUM;
|
|
|
+import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.convertMasterKeyToDelegationKey;
|
|
|
+
|
|
|
/**
|
|
|
* SQL implementation of {@link FederationStateStore}.
|
|
|
*/
|
|
@@ -164,6 +181,27 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
protected static final String CALL_SP_UPDATE_RESERVATION_HOME_SUBCLUSTER =
|
|
|
"{call sp_updateReservationHomeSubCluster(?, ?, ?)}";
|
|
|
|
|
|
+ protected static final String CALL_SP_ADD_MASTERKEY =
|
|
|
+ "{call sp_addMasterKey(?, ?, ?)}";
|
|
|
+
|
|
|
+ protected static final String CALL_SP_GET_MASTERKEY =
|
|
|
+ "{call sp_getMasterKey(?, ?)}";
|
|
|
+
|
|
|
+ protected static final String CALL_SP_DELETE_MASTERKEY =
|
|
|
+ "{call sp_deleteMasterKey(?, ?)}";
|
|
|
+
|
|
|
+ protected static final String CALL_SP_ADD_DELEGATIONTOKEN =
|
|
|
+ "{call sp_addDelegationToken(?, ?, ?, ?, ?)}";
|
|
|
+
|
|
|
+ protected static final String CALL_SP_GET_DELEGATIONTOKEN =
|
|
|
+ "{call sp_getDelegationToken(?, ?, ?, ?)}";
|
|
|
+
|
|
|
+ protected static final String CALL_SP_UPDATE_DELEGATIONTOKEN =
|
|
|
+ "{call sp_updateDelegationToken(?, ?, ?, ?, ?)}";
|
|
|
+
|
|
|
+ protected static final String CALL_SP_DELETE_DELEGATIONTOKEN =
|
|
|
+ "{call sp_deleteDelegationToken(?, ?)}";
|
|
|
+
|
|
|
private Calendar utcCalendar =
|
|
|
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
|
|
|
|
|
@@ -247,7 +285,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.setString("state_IN", subClusterInfo.getState().toString());
|
|
|
cstmt.setLong("lastStartTime_IN", subClusterInfo.getLastStartTime());
|
|
|
cstmt.setString("capability_IN", subClusterInfo.getCapability());
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -302,7 +340,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// Set the parameters for the stored procedure
|
|
|
cstmt.setString("subClusterId_IN", subClusterId.getId());
|
|
|
cstmt.setString("state_IN", state.toString());
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -356,7 +394,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.setString("subClusterId_IN", subClusterId.getId());
|
|
|
cstmt.setString("state_IN", state.toString());
|
|
|
cstmt.setString("capability_IN", subClusterHeartbeatRequest.getCapability());
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -408,14 +446,14 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.setString("subClusterId_IN", subClusterId.getId());
|
|
|
|
|
|
// Set the parameters for the stored procedure
|
|
|
- cstmt.registerOutParameter("amRMServiceAddress_OUT", java.sql.Types.VARCHAR);
|
|
|
- cstmt.registerOutParameter("clientRMServiceAddress_OUT", java.sql.Types.VARCHAR);
|
|
|
- cstmt.registerOutParameter("rmAdminServiceAddress_OUT", java.sql.Types.VARCHAR);
|
|
|
- cstmt.registerOutParameter("rmWebServiceAddress_OUT", java.sql.Types.VARCHAR);
|
|
|
+ cstmt.registerOutParameter("amRMServiceAddress_OUT", VARCHAR);
|
|
|
+ cstmt.registerOutParameter("clientRMServiceAddress_OUT", VARCHAR);
|
|
|
+ cstmt.registerOutParameter("rmAdminServiceAddress_OUT", VARCHAR);
|
|
|
+ cstmt.registerOutParameter("rmWebServiceAddress_OUT", VARCHAR);
|
|
|
cstmt.registerOutParameter("lastHeartBeat_OUT", java.sql.Types.TIMESTAMP);
|
|
|
- cstmt.registerOutParameter("state_OUT", java.sql.Types.VARCHAR);
|
|
|
- cstmt.registerOutParameter("lastStartTime_OUT", java.sql.Types.BIGINT);
|
|
|
- cstmt.registerOutParameter("capability_OUT", java.sql.Types.VARCHAR);
|
|
|
+ cstmt.registerOutParameter("state_OUT", VARCHAR);
|
|
|
+ cstmt.registerOutParameter("lastStartTime_OUT", BIGINT);
|
|
|
+ cstmt.registerOutParameter("capability_OUT", VARCHAR);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -548,8 +586,8 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// Set the parameters for the stored procedure
|
|
|
cstmt.setString("applicationId_IN", appId.toString());
|
|
|
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
|
|
|
- cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -625,7 +663,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// Set the parameters for the stored procedure
|
|
|
cstmt.setString("applicationId_IN", appId.toString());
|
|
|
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -677,7 +715,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
// Set the parameters for the stored procedure
|
|
|
cstmt.setString("applicationId_IN", applicationId.toString());
|
|
|
- cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
|
|
|
+ cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -775,7 +813,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
// Set the parameters for the stored procedure
|
|
|
cstmt.setString("applicationId_IN", applicationId.toString());
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -825,7 +863,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
|
|
|
// Set the parameters for the stored procedure
|
|
|
cstmt.setString("queue_IN", request.getQueue());
|
|
|
- cstmt.registerOutParameter("policyType_OUT", java.sql.Types.VARCHAR);
|
|
|
+ cstmt.registerOutParameter("policyType_OUT", VARCHAR);
|
|
|
cstmt.registerOutParameter("params_OUT", java.sql.Types.VARBINARY);
|
|
|
|
|
|
// Execute the query
|
|
@@ -877,7 +915,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
cstmt.setString("queue_IN", policyConf.getQueue());
|
|
|
cstmt.setString("policyType_IN", policyConf.getType());
|
|
|
cstmt.setBytes("params_IN", getByteArray(policyConf.getParams()));
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -984,6 +1022,22 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
return dataSource.getConnection();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get a connection from the DataSource pool.
|
|
|
+ *
|
|
|
+ * @param isCommitted Whether to enable automatic transaction commit.
|
|
|
+ * If set to true, turn on transaction autocommit,
|
|
|
+ * if set to false, turn off transaction autocommit.
|
|
|
+ *
|
|
|
+ * @return a connection from the DataSource pool.
|
|
|
+ * @throws SQLException on failure.
|
|
|
+ */
|
|
|
+ protected Connection getConnection(boolean isCommitted) throws SQLException {
|
|
|
+ Connection dbConn = getConnection();
|
|
|
+ dbConn.setAutoCommit(isCommitted);
|
|
|
+ return dbConn;
|
|
|
+ }
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
protected CallableStatement getCallableStatement(String procedure)
|
|
|
throws SQLException {
|
|
@@ -1029,9 +1083,9 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// 2)IN homeSubCluster_IN varchar(256)
|
|
|
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
|
|
|
// 3) OUT storedHomeSubCluster_OUT varchar(256)
|
|
|
- cstmt.registerOutParameter("storedHomeSubCluster_OUT", java.sql.Types.VARCHAR);
|
|
|
+ cstmt.registerOutParameter("storedHomeSubCluster_OUT", VARCHAR);
|
|
|
// 4) OUT rowCount_OUT int
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -1119,7 +1173,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// 1)IN reservationId_IN varchar(128)
|
|
|
cstmt.setString("reservationId_IN", reservationId.toString());
|
|
|
// 2)OUT homeSubCluster_OUT varchar(256)
|
|
|
- cstmt.registerOutParameter("homeSubCluster_OUT", java.sql.Types.VARCHAR);
|
|
|
+ cstmt.registerOutParameter("homeSubCluster_OUT", VARCHAR);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -1237,7 +1291,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// 1)IN reservationId_IN varchar(128)
|
|
|
cstmt.setString("reservationId_IN", reservationId.toString());
|
|
|
// 2)OUT rowCount_OUT int
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -1306,7 +1360,7 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
// 2)IN homeSubCluster_IN varchar(256)
|
|
|
cstmt.setString("homeSubCluster_IN", subClusterId.getId());
|
|
|
// 3)OUT rowCount_OUT int
|
|
|
- cstmt.registerOutParameter("rowCount_OUT", java.sql.Types.INTEGER);
|
|
|
+ cstmt.registerOutParameter("rowCount_OUT", INTEGER);
|
|
|
|
|
|
// Execute the query
|
|
|
long startTime = clock.getTime();
|
|
@@ -1353,70 +1407,503 @@ public class SQLFederationStateStore implements FederationStateStore {
|
|
|
return conn;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * SQLFederationStateStore Supports Store New MasterKey.
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey.
|
|
|
+ * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2: Parse the parameters and serialize the DelegationKey as a string.
|
|
|
+ DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
|
|
|
+ int keyId = delegationKey.getKeyId();
|
|
|
+ String delegationKeyStr = FederationStateStoreUtils.encodeWritable(delegationKey);
|
|
|
+
|
|
|
+ // Step3. store data in database.
|
|
|
+ try {
|
|
|
+
|
|
|
+ FederationSQLOutParameter<Integer> rowCountOUT =
|
|
|
+ new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class);
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY, keyId,
|
|
|
+ delegationKeyStr, rowCountOUT);
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+
|
|
|
+ // We hope that 1 record can be written to the database.
|
|
|
+ // If the number of records is not 1, it means that the data was written incorrectly.
|
|
|
+ if (rowCount != 1) {
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ "Wrong behavior during the insertion of masterKey, keyId = %s. " +
|
|
|
+ "please check the records of the database.", String.valueOf(keyId));
|
|
|
+ }
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
|
|
|
+ "Unable to insert the newly masterKey, keyId = %s.", String.valueOf(keyId));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step4. Query Data from the database and return the result.
|
|
|
+ return getMasterKeyByDelegationKey(request);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * SQLFederationStateStore Supports Remove MasterKey.
|
|
|
+ *
|
|
|
+ * Defined the sp_deleteMasterKey procedure.
|
|
|
+ * This procedure requires 1 input parameters, 1 output parameters.
|
|
|
+ * Input parameters
|
|
|
+ * 1. IN keyId_IN int
|
|
|
+ * Output parameters
|
|
|
+ * 2. OUT rowCount_OUT int
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
|
|
|
+ * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2: Parse parameters and get KeyId.
|
|
|
+ RouterMasterKey paramMasterKey = request.getRouterMasterKey();
|
|
|
+ int paramKeyId = paramMasterKey.getKeyId();
|
|
|
+
|
|
|
+ // Step3. Clear data from database.
|
|
|
+ try {
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ FederationSQLOutParameter<Integer> rowCountOUT =
|
|
|
+ new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class);
|
|
|
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY,
|
|
|
+ paramKeyId, rowCountOUT);
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+
|
|
|
+ // if it is equal to 0 it means the call
|
|
|
+ // did not delete the reservation from FederationStateStore
|
|
|
+ if (rowCount == 0) {
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
|
|
|
+ } else if (rowCount != 1) {
|
|
|
+ // if it is different from 1 it means the call
|
|
|
+ // had a wrong behavior. Maybe the database is not set correctly.
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ "Wrong behavior during deleting the keyId %s. " +
|
|
|
+ "The database is expected to delete 1 record, " +
|
|
|
+ "but the number of deleted records returned by the database is greater than 1, " +
|
|
|
+ "indicating that a duplicate masterKey occurred during the deletion process.",
|
|
|
+ paramKeyId);
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Delete from the StateStore the keyId: {}.", paramKeyId);
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
|
|
|
+ return RouterMasterKeyResponse.newInstance(paramMasterKey);
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
|
|
|
+ "Unable to delete the keyId %s.", paramKeyId);
|
|
|
+ }
|
|
|
+
|
|
|
+ throw new YarnException("Unable to delete the masterKey, keyId = " + paramKeyId);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * SQLFederationStateStore Supports Remove MasterKey.
|
|
|
+ *
|
|
|
+ * Defined the sp_getMasterKey procedure.
|
|
|
+ * this procedure requires 2 parameters.
|
|
|
+ * Input parameters:
|
|
|
+ * 1. IN keyId_IN int
|
|
|
+ * Output parameters:
|
|
|
+ * 2. OUT masterKey_OUT varchar(1024)
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterMasterKey, which is an abstraction for DelegationKey
|
|
|
+ * @return routerMasterKeyResponse, the response contains the RouterMasterKey.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2: Parse parameters and get KeyId.
|
|
|
+ RouterMasterKey paramMasterKey = request.getRouterMasterKey();
|
|
|
+ int paramKeyId = paramMasterKey.getKeyId();
|
|
|
+
|
|
|
+ // Step3: Call the stored procedure to get the result.
|
|
|
+ try {
|
|
|
+
|
|
|
+ FederationQueryRunner runner = new FederationQueryRunner();
|
|
|
+ FederationSQLOutParameter<String> masterKeyOUT =
|
|
|
+ new FederationSQLOutParameter<>("masterKey_OUT", VARCHAR, String.class);
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ RouterMasterKey routerMasterKey = runner.execute(
|
|
|
+ conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(), paramKeyId, masterKeyOUT);
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+
|
|
|
+ LOG.info("Got the information about the specified masterKey = {} according to keyId = {}.",
|
|
|
+ routerMasterKey, paramKeyId);
|
|
|
+
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
|
|
|
+
|
|
|
+ // Return query result.
|
|
|
+ return RouterMasterKeyResponse.newInstance(routerMasterKey);
|
|
|
+
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
|
|
|
+ "Unable to obtain the masterKey information according to %s.",
|
|
|
+ String.valueOf(paramKeyId));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Throw exception information
|
|
|
+ throw new YarnException(
|
|
|
+ "Unable to obtain the masterKey information according to " + paramKeyId);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier.
|
|
|
+ *
|
|
|
+ * Defined the sp_addDelegationToken procedure.
|
|
|
+ * This procedure requires 4 input parameters, 1 output parameters.
|
|
|
+ * Input parameters:
|
|
|
+ * 1. IN sequenceNum_IN int
|
|
|
+ * 2. IN tokenIdent_IN varchar(1024)
|
|
|
+ * 3. IN token_IN varchar(1024)
|
|
|
+ * 4. IN renewDate_IN bigint
|
|
|
+ * Output parameters:
|
|
|
+ * 5. OUT rowCount_OUT int
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
|
|
|
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2. store data in database.
|
|
|
+ try {
|
|
|
+ long duration = addOrUpdateToken(request, true);
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step3. Query Data from the database and return the result.
|
|
|
+ return getTokenByRouterStoreToken(request);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * SQLFederationStateStore Supports Update RMDelegationTokenIdentifier.
|
|
|
+ *
|
|
|
+ * Defined the sp_updateDelegationToken procedure.
|
|
|
+ * This procedure requires 4 input parameters, 1 output parameters.
|
|
|
+ * Input parameters:
|
|
|
+ * 1. IN sequenceNum_IN int
|
|
|
+ * 2. IN tokenIdent_IN varchar(1024)
|
|
|
+ * 3. IN token_IN varchar(1024)
|
|
|
+ * 4. IN renewDate_IN bigint
|
|
|
+ * Output parameters:
|
|
|
+ * 5. OUT rowCount_OUT int
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
|
|
|
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2. update data in database.
|
|
|
+ try {
|
|
|
+ long duration = addOrUpdateToken(request, false);
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step3. Query Data from the database and return the result.
|
|
|
+ return getTokenByRouterStoreToken(request);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Add Or Update RMDelegationTokenIdentifier.
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
|
|
|
+ * @param isAdd true, addData; false, updateData.
|
|
|
+ * @return method operation time.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ * @throws SQLException An SQL Error occurred.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ */
|
|
|
+ private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd)
|
|
|
+ throws IOException, SQLException, YarnException {
|
|
|
+
|
|
|
+ // Parse parameters and get KeyId.
|
|
|
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
|
|
|
+ YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
|
|
|
+ String tokenIdentifier = FederationStateStoreUtils.encodeWritable(identifier);
|
|
|
+ String tokenInfo = routerStoreToken.getTokenInfo();
|
|
|
+ long renewDate = routerStoreToken.getRenewDate();
|
|
|
+ int sequenceNum = identifier.getSequenceNumber();
|
|
|
+
|
|
|
+ FederationQueryRunner runner = new FederationQueryRunner();
|
|
|
+ FederationSQLOutParameter<Integer> rowCountOUT =
|
|
|
+ new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class);
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN : CALL_SP_UPDATE_DELEGATIONTOKEN;
|
|
|
+ Integer rowCount = runner.execute(conn, procedure, new RowCountHandler("rowCount_OUT"),
|
|
|
+ sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT);
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+
|
|
|
+ // Get rowCount
|
|
|
+ // In the process of updating the code, rowCount may be 0 or 1;
|
|
|
+ // if rowCount=1, it is as expected, indicating that we have updated the Token correctly;
|
|
|
+ // if rowCount=0, it is not as expected,
|
|
|
+ // indicating that we have not updated the Token correctly.
|
|
|
+ if (rowCount != 1) {
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ "Wrong behavior during the insertion of delegationToken, tokenId = %s. " +
|
|
|
+ "Please check the records of the database.", String.valueOf(sequenceNum));
|
|
|
+ }
|
|
|
+
|
|
|
+ // return execution time
|
|
|
+ return (stopTime - startTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * SQLFederationStateStore Supports Remove RMDelegationTokenIdentifier.
|
|
|
+ *
|
|
|
+ * Defined the sp_deleteDelegationToken procedure.
|
|
|
+ * This procedure requires 1 input parameters, 1 output parameters.
|
|
|
+ * Input parameters:
|
|
|
+ * 1. IN sequenceNum_IN bigint
|
|
|
+ * Output parameters:
|
|
|
+ * 2. OUT rowCount_OUT int
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
|
|
|
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2: Parse parameters and get KeyId.
|
|
|
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
|
|
|
+ YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
|
|
|
+ int sequenceNum = identifier.getSequenceNumber();
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ FederationSQLOutParameter<Integer> rowCountOUT =
|
|
|
+ new FederationSQLOutParameter<>("rowCount_OUT", INTEGER, Integer.class);
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN,
|
|
|
+ sequenceNum, rowCountOUT);
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+
|
|
|
+ // if it is equal to 0 it means the call
|
|
|
+ // did not delete the reservation from FederationStateStore
|
|
|
+ if (rowCount == 0) {
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ "TokenId %s does not exist", String.valueOf(sequenceNum));
|
|
|
+ } else if (rowCount != 1) {
|
|
|
+ // if it is different from 1 it means the call
|
|
|
+ // had a wrong behavior. Maybe the database is not set correctly.
|
|
|
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
|
|
|
+ "Wrong behavior during deleting the delegationToken %s. " +
|
|
|
+ "The database is expected to delete 1 record, " +
|
|
|
+ "but the number of deleted records returned by the database is greater than 1, " +
|
|
|
+ "indicating that a duplicate tokenId occurred during the deletion process.",
|
|
|
+ String.valueOf(sequenceNum));
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Delete from the StateStore the delegationToken, tokenId = {}.", sequenceNum);
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
|
|
|
+ return RouterRMTokenResponse.newInstance(routerStoreToken);
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
|
|
|
+ "Unable to delete the delegationToken, tokenId = %s.", sequenceNum);
|
|
|
+ }
|
|
|
+ throw new YarnException("Unable to delete the delegationToken, tokenId = " + sequenceNum);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The Router Supports GetTokenByRouterStoreToken.
|
|
|
+ *
|
|
|
+ * @param request The request contains RouterRMToken (RMDelegationTokenIdentifier and renewDate)
|
|
|
+ * @return RouterRMTokenResponse.
|
|
|
+ * @throws YarnException if the call to the state store is unsuccessful.
|
|
|
+ * @throws IOException An IO Error occurred.
|
|
|
+ */
|
|
|
@Override
|
|
|
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
|
|
|
throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+ // Step1: Verify parameters to ensure that key fields are not empty.
|
|
|
+ FederationRouterRMTokenInputValidator.validate(request);
|
|
|
+
|
|
|
+ // Step2: Parse parameters and get KeyId.
|
|
|
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
|
|
|
+ YARNDelegationTokenIdentifier identifier = routerStoreToken.getTokenIdentifier();
|
|
|
+ int sequenceNum = identifier.getSequenceNumber();
|
|
|
+
|
|
|
+ try {
|
|
|
+ FederationQueryRunner runner = new FederationQueryRunner();
|
|
|
+ FederationSQLOutParameter<String> tokenIdentOUT =
|
|
|
+ new FederationSQLOutParameter<>("tokenIdent_OUT", VARCHAR, String.class);
|
|
|
+ FederationSQLOutParameter<String> tokenOUT =
|
|
|
+ new FederationSQLOutParameter<>("token_OUT", VARCHAR, String.class);
|
|
|
+ FederationSQLOutParameter<Long> renewDateOUT =
|
|
|
+ new FederationSQLOutParameter<>("renewDate_OUT", BIGINT, Long.class);
|
|
|
+
|
|
|
+ // Execute the query
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ RouterStoreToken resultToken = runner.execute(conn, CALL_SP_GET_DELEGATIONTOKEN,
|
|
|
+ new RouterStoreTokenHandler(), sequenceNum, tokenIdentOUT, tokenOUT, renewDateOUT);
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+
|
|
|
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
|
|
|
+ return RouterRMTokenResponse.newInstance(resultToken);
|
|
|
+ } catch (SQLException e) {
|
|
|
+ FederationStateStoreClientMetrics.failedStateStoreCall();
|
|
|
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
|
|
|
+ "Unable to get the delegationToken, tokenId = %s.", String.valueOf(sequenceNum));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Throw exception information
|
|
|
+ throw new YarnException("Unable to get the delegationToken, tokenId = " + sequenceNum);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Call Procedure to get RowCount.
|
|
|
+ *
|
|
|
+ * @param procedure procedureSQL.
|
|
|
+ * @param params procedure params.
|
|
|
+ * @return RowCount.
|
|
|
+ * @throws SQLException An exception occurred when calling a stored procedure.
|
|
|
+ */
|
|
|
+ private int getRowCountByProcedureSQL(String procedure, Object... params) throws SQLException {
|
|
|
+ FederationQueryRunner runner = new FederationQueryRunner();
|
|
|
+ // Execute the query
|
|
|
+ Integer rowCount = runner.execute(conn, procedure,
|
|
|
+ new RowCountHandler("rowCount_OUT"), params);
|
|
|
+ return rowCount;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Increment DelegationToken SeqNum.
|
|
|
+ *
|
|
|
+ * @return delegationTokenSeqNum.
|
|
|
+ */
|
|
|
@Override
|
|
|
public int incrementDelegationTokenSeqNum() {
|
|
|
- return 0;
|
|
|
+ return querySequenceTable(YARN_ROUTER_SEQUENCE_NUM, true);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get DelegationToken SeqNum.
|
|
|
+ *
|
|
|
+ * @return delegationTokenSeqNum.
|
|
|
+ */
|
|
|
@Override
|
|
|
public int getDelegationTokenSeqNum() {
|
|
|
- return 0;
|
|
|
+ return querySequenceTable(YARN_ROUTER_SEQUENCE_NUM, false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void setDelegationTokenSeqNum(int seqNum) {
|
|
|
- return;
|
|
|
+ Connection connection = null;
|
|
|
+ try {
|
|
|
+ connection = getConnection(false);
|
|
|
+ FederationQueryRunner runner = new FederationQueryRunner();
|
|
|
+ runner.updateSequenceTable(connection, YARN_ROUTER_SEQUENCE_NUM, seqNum);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("Could not update sequence table!!", e);
|
|
|
+ } finally {
|
|
|
+ // Return to the pool the CallableStatement
|
|
|
+ try {
|
|
|
+ FederationStateStoreUtils.returnToPool(LOG, null, connection);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ LOG.error("close connection error.", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get Current KeyId.
|
|
|
+ *
|
|
|
+ * @return currentKeyId.
|
|
|
+ */
|
|
|
@Override
|
|
|
public int getCurrentKeyId() {
|
|
|
- return 0;
|
|
|
+ return querySequenceTable(YARN_ROUTER_CURRENT_KEY_ID, false);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The Router Supports incrementCurrentKeyId.
|
|
|
+ *
|
|
|
+ * @return CurrentKeyId.
|
|
|
+ */
|
|
|
@Override
|
|
|
public int incrementCurrentKeyId() {
|
|
|
- return 0;
|
|
|
+ return querySequenceTable(YARN_ROUTER_CURRENT_KEY_ID, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int querySequenceTable(String sequenceName, boolean isUpdate){
|
|
|
+ Connection connection = null;
|
|
|
+ try {
|
|
|
+ connection = getConnection(false);
|
|
|
+ FederationQueryRunner runner = new FederationQueryRunner();
|
|
|
+ return runner.selectOrUpdateSequenceTable(connection, sequenceName, isUpdate);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException("Could not query sequence table!!", e);
|
|
|
+ } finally {
|
|
|
+ // Return to the pool the CallableStatement
|
|
|
+ try {
|
|
|
+ FederationStateStoreUtils.returnToPool(LOG, null, connection);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ LOG.error("close connection error.", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|