|
@@ -31,9 +31,8 @@ import java.security.PrivateKey;
|
|
|
import java.security.cert.X509Certificate;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map.Entry;
|
|
|
-import java.util.Timer;
|
|
|
-import java.util.TimerTask;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -42,13 +41,11 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
|
-import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
|
|
@@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.Appl
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
|
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
@@ -66,8 +62,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
|
|
-import org.fusesource.leveldbjni.JniDBFactory;
|
|
|
-import org.fusesource.leveldbjni.internal.NativeDB;
|
|
|
import org.iq80.leveldb.DB;
|
|
|
import org.iq80.leveldb.DBException;
|
|
|
import org.iq80.leveldb.Options;
|
|
@@ -100,7 +94,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
.newInstance(1, 1);
|
|
|
|
|
|
private DB db;
|
|
|
- private Timer compactionTimer;
|
|
|
+ private DBManager dbManager = new DBManager();
|
|
|
private long compactionIntervalMsec;
|
|
|
|
|
|
private String getApplicationNodeKey(ApplicationId appId) {
|
|
@@ -140,7 +134,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void initInternal(Configuration conf) throws Exception {
|
|
|
+ protected void initInternal(Configuration conf) {
|
|
|
compactionIntervalMsec = conf.getLong(
|
|
|
YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
|
|
|
YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
|
|
@@ -165,55 +159,20 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
@Override
|
|
|
protected void startInternal() throws Exception {
|
|
|
- db = openDatabase();
|
|
|
- startCompactionTimer();
|
|
|
- }
|
|
|
-
|
|
|
- protected DB openDatabase() throws Exception {
|
|
|
Path storeRoot = createStorageDir();
|
|
|
Options options = new Options();
|
|
|
options.createIfMissing(false);
|
|
|
LOG.info("Using state database at " + storeRoot + " for recovery");
|
|
|
File dbfile = new File(storeRoot.toString());
|
|
|
- try {
|
|
|
- db = JniDBFactory.factory.open(dbfile, options);
|
|
|
- } catch (NativeDB.DBException e) {
|
|
|
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
|
|
- LOG.info("Creating state database at " + dbfile);
|
|
|
- options.createIfMissing(true);
|
|
|
- try {
|
|
|
- db = JniDBFactory.factory.open(dbfile, options);
|
|
|
- // store version
|
|
|
- storeVersion();
|
|
|
- } catch (DBException dbErr) {
|
|
|
- throw new IOException(dbErr.getMessage(), dbErr);
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- return db;
|
|
|
- }
|
|
|
-
|
|
|
- private void startCompactionTimer() {
|
|
|
- if (compactionIntervalMsec > 0) {
|
|
|
- compactionTimer = new Timer(
|
|
|
- this.getClass().getSimpleName() + " compaction timer", true);
|
|
|
- compactionTimer.schedule(new CompactionTimerTask(),
|
|
|
- compactionIntervalMsec, compactionIntervalMsec);
|
|
|
- }
|
|
|
+ db = dbManager.initDatabase(dbfile, options, (database) ->
|
|
|
+ storeVersion(CURRENT_VERSION_INFO));
|
|
|
+ dbManager.startCompactionTimer(compactionIntervalMsec,
|
|
|
+ this.getClass().getSimpleName());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void closeInternal() throws Exception {
|
|
|
- if (compactionTimer != null) {
|
|
|
- compactionTimer.cancel();
|
|
|
- compactionTimer = null;
|
|
|
- }
|
|
|
- if (db != null) {
|
|
|
- db.close();
|
|
|
- db = null;
|
|
|
- }
|
|
|
+ dbManager.close();
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -228,33 +187,22 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
@Override
|
|
|
protected Version loadVersion() throws Exception {
|
|
|
- Version version = null;
|
|
|
- try {
|
|
|
- byte[] data = db.get(bytes(VERSION_NODE));
|
|
|
- if (data != null) {
|
|
|
- version = new VersionPBImpl(VersionProto.parseFrom(data));
|
|
|
- }
|
|
|
- } catch (DBException e) {
|
|
|
- throw new IOException(e);
|
|
|
- }
|
|
|
- return version;
|
|
|
+ return dbManager.loadVersion(VERSION_NODE);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void storeVersion() throws Exception {
|
|
|
- dbStoreVersion(CURRENT_VERSION_INFO);
|
|
|
- }
|
|
|
-
|
|
|
- void dbStoreVersion(Version state) throws IOException {
|
|
|
- String key = VERSION_NODE;
|
|
|
- byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
|
|
|
try {
|
|
|
- db.put(bytes(key), data);
|
|
|
+ storeVersion(CURRENT_VERSION_INFO);
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ protected void storeVersion(Version version) {
|
|
|
+ dbManager.storeVersion(VERSION_NODE, version);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected Version getCurrentVersion() {
|
|
|
return CURRENT_VERSION_INFO;
|
|
@@ -290,9 +238,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
private void loadReservationState(RMState rmState) throws IOException {
|
|
|
int numReservations = 0;
|
|
|
- LeveldbIterator iter = null;
|
|
|
- try {
|
|
|
- iter = new LeveldbIterator(db);
|
|
|
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
|
|
|
iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
|
|
|
while (iter.hasNext()) {
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
@@ -324,10 +270,6 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
|
- } finally {
|
|
|
- if (iter != null) {
|
|
|
- iter.close();
|
|
|
- }
|
|
|
}
|
|
|
LOG.info("Recovered " + numReservations + " reservations");
|
|
|
}
|
|
@@ -342,9 +284,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
|
|
|
int numKeys = 0;
|
|
|
- LeveldbIterator iter = null;
|
|
|
- try {
|
|
|
- iter = new LeveldbIterator(db);
|
|
|
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
|
|
|
iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
|
|
|
while (iter.hasNext()) {
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
@@ -361,10 +301,6 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
|
- } finally {
|
|
|
- if (iter != null) {
|
|
|
- iter.close();
|
|
|
- }
|
|
|
}
|
|
|
return numKeys;
|
|
|
}
|
|
@@ -382,9 +318,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
|
|
|
int numTokens = 0;
|
|
|
- LeveldbIterator iter = null;
|
|
|
- try {
|
|
|
- iter = new LeveldbIterator(db);
|
|
|
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
|
|
|
iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
|
|
|
while (iter.hasNext()) {
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
@@ -404,17 +338,13 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
|
- } finally {
|
|
|
- if (iter != null) {
|
|
|
- iter.close();
|
|
|
- }
|
|
|
}
|
|
|
return numTokens;
|
|
|
}
|
|
|
|
|
|
private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
|
|
|
throws IOException {
|
|
|
- RMDelegationTokenIdentifierData tokenData = null;
|
|
|
+ RMDelegationTokenIdentifierData tokenData;
|
|
|
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
|
|
|
try {
|
|
|
tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
|
|
@@ -426,7 +356,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
|
|
|
throws IOException {
|
|
|
- byte[] data = null;
|
|
|
+ byte[] data;
|
|
|
try {
|
|
|
data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
|
|
|
} catch (DBException e) {
|
|
@@ -445,9 +375,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
private void loadRMApps(RMState state) throws IOException {
|
|
|
int numApps = 0;
|
|
|
int numAppAttempts = 0;
|
|
|
- LeveldbIterator iter = null;
|
|
|
- try {
|
|
|
- iter = new LeveldbIterator(db);
|
|
|
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
|
|
|
iter.seek(bytes(RM_APP_KEY_PREFIX));
|
|
|
while (iter.hasNext()) {
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
@@ -467,10 +395,6 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
|
- } finally {
|
|
|
- if (iter != null) {
|
|
|
- iter.close();
|
|
|
- }
|
|
|
}
|
|
|
LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
|
|
|
+ " application attempts");
|
|
@@ -523,7 +447,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
@VisibleForTesting
|
|
|
ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
|
|
|
String appKey = getApplicationNodeKey(appId);
|
|
|
- byte[] data = null;
|
|
|
+ byte[] data;
|
|
|
try {
|
|
|
data = db.get(bytes(appKey));
|
|
|
} catch (DBException e) {
|
|
@@ -539,7 +463,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
ApplicationAttemptStateData loadRMAppAttemptState(
|
|
|
ApplicationAttemptId attemptId) throws IOException {
|
|
|
String attemptKey = getApplicationAttemptNodeKey(attemptId);
|
|
|
- byte[] data = null;
|
|
|
+ byte[] data;
|
|
|
try {
|
|
|
data = db.get(bytes(attemptKey));
|
|
|
} catch (DBException e) {
|
|
@@ -668,8 +592,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
appState.getApplicationSubmissionContext().getApplicationId();
|
|
|
String appKey = getApplicationNodeKey(appId);
|
|
|
try {
|
|
|
- WriteBatch batch = db.createWriteBatch();
|
|
|
- try {
|
|
|
+ try (WriteBatch batch = db.createWriteBatch()) {
|
|
|
batch.delete(bytes(appKey));
|
|
|
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
|
|
|
String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
|
|
@@ -680,8 +603,6 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
+ appState.attempts.size() + " attempts" + " at " + appKey);
|
|
|
}
|
|
|
db.write(batch);
|
|
|
- } finally {
|
|
|
- batch.close();
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
@@ -693,16 +614,13 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
ReservationAllocationStateProto reservationAllocation, String planName,
|
|
|
String reservationIdName) throws Exception {
|
|
|
try {
|
|
|
- WriteBatch batch = db.createWriteBatch();
|
|
|
- try {
|
|
|
+ try (WriteBatch batch = db.createWriteBatch()) {
|
|
|
String key = getReservationNodeKey(planName, reservationIdName);
|
|
|
LOG.debug("Storing state for reservation {} plan {} at {}",
|
|
|
reservationIdName, planName, key);
|
|
|
|
|
|
batch.put(bytes(key), reservationAllocation.toByteArray());
|
|
|
db.write(batch);
|
|
|
- } finally {
|
|
|
- batch.close();
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
@@ -713,16 +631,13 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
protected void removeReservationState(String planName,
|
|
|
String reservationIdName) throws Exception {
|
|
|
try {
|
|
|
- WriteBatch batch = db.createWriteBatch();
|
|
|
- try {
|
|
|
+ try (WriteBatch batch = db.createWriteBatch()) {
|
|
|
String reservationKey =
|
|
|
getReservationNodeKey(planName, reservationIdName);
|
|
|
batch.delete(bytes(reservationKey));
|
|
|
LOG.debug("Removing state for reservation {} plan {} at {}",
|
|
|
reservationIdName, planName, reservationKey);
|
|
|
db.write(batch);
|
|
|
- } finally {
|
|
|
- batch.close();
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
@@ -736,10 +651,9 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
new RMDelegationTokenIdentifierData(tokenId, renewDate);
|
|
|
LOG.debug("Storing token to {}", tokenKey);
|
|
|
try {
|
|
|
- WriteBatch batch = db.createWriteBatch();
|
|
|
- try {
|
|
|
+ try (WriteBatch batch = db.createWriteBatch()) {
|
|
|
batch.put(bytes(tokenKey), tokenData.toByteArray());
|
|
|
- if(!isUpdate) {
|
|
|
+ if (!isUpdate) {
|
|
|
ByteArrayOutputStream bs = new ByteArrayOutputStream();
|
|
|
try (DataOutputStream ds = new DataOutputStream(bs)) {
|
|
|
ds.writeInt(tokenId.getSequenceNumber());
|
|
@@ -749,8 +663,6 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
|
|
|
}
|
|
|
db.write(batch);
|
|
|
- } finally {
|
|
|
- batch.close();
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
@@ -789,11 +701,8 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
String dbKey = getRMDTMasterKeyNodeKey(masterKey);
|
|
|
LOG.debug("Storing token master key to {}", dbKey);
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
- DataOutputStream out = new DataOutputStream(os);
|
|
|
- try {
|
|
|
+ try (DataOutputStream out = new DataOutputStream(os)) {
|
|
|
masterKey.write(out);
|
|
|
- } finally {
|
|
|
- out.close();
|
|
|
}
|
|
|
try {
|
|
|
db.put(bytes(dbKey), os.toByteArray());
|
|
@@ -833,13 +742,10 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
String caPrivateKeyKey = getProxyCAPrivateKeyNodeKey();
|
|
|
|
|
|
try {
|
|
|
- WriteBatch batch = db.createWriteBatch();
|
|
|
- try {
|
|
|
+ try (WriteBatch batch = db.createWriteBatch()) {
|
|
|
batch.put(bytes(caCertKey), caCertData);
|
|
|
batch.put(bytes(caPrivateKeyKey), caPrivateKeyData);
|
|
|
db.write(batch);
|
|
|
- } finally {
|
|
|
- batch.close();
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
@@ -871,9 +777,7 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
@VisibleForTesting
|
|
|
int getNumEntriesInDatabase() throws IOException {
|
|
|
int numEntries = 0;
|
|
|
- LeveldbIterator iter = null;
|
|
|
- try {
|
|
|
- iter = new LeveldbIterator(db);
|
|
|
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
|
|
|
iter.seekToFirst();
|
|
|
while (iter.hasNext()) {
|
|
|
Entry<byte[], byte[]> entry = iter.next();
|
|
@@ -882,26 +786,12 @@ public class LeveldbRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
} catch (DBException e) {
|
|
|
throw new IOException(e);
|
|
|
- } finally {
|
|
|
- if (iter != null) {
|
|
|
- iter.close();
|
|
|
- }
|
|
|
}
|
|
|
return numEntries;
|
|
|
}
|
|
|
|
|
|
- private class CompactionTimerTask extends TimerTask {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- long start = Time.monotonicNow();
|
|
|
- LOG.info("Starting full compaction cycle");
|
|
|
- try {
|
|
|
- db.compactRange(null, null);
|
|
|
- } catch (DBException e) {
|
|
|
- LOG.error("Error compacting database", e);
|
|
|
- }
|
|
|
- long duration = Time.monotonicNow() - start;
|
|
|
- LOG.info("Full compaction cycle completed in " + duration + " msec");
|
|
|
- }
|
|
|
+ @VisibleForTesting
|
|
|
+ protected void setDbManager(DBManager dbManager) {
|
|
|
+ this.dbManager = dbManager;
|
|
|
}
|
|
|
}
|