|
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
@@ -158,6 +159,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
|
|
|
private DB db;
|
|
private DB db;
|
|
private boolean isNewlyCreated;
|
|
private boolean isNewlyCreated;
|
|
|
|
+ private boolean isHealthy;
|
|
private Timer compactionTimer;
|
|
private Timer compactionTimer;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -172,6 +174,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void startStorage() throws IOException {
|
|
protected void startStorage() throws IOException {
|
|
|
|
+ // Assume that we're healthy when we start
|
|
|
|
+ isHealthy = true;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -190,6 +194,36 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
return isNewlyCreated;
|
|
return isNewlyCreated;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * If the state store throws an error after recovery has been performed
|
|
|
|
+ * then we can not trust it any more to reflect the NM state. We need to
|
|
|
|
+ * mark the store and node unhealthy.
|
|
|
|
+ * Errors during the recovery will cause a service failure and thus a NM
|
|
|
|
+ * start failure. Do not need to mark the store unhealthy for those.
|
|
|
|
+ * @param dbErr Exception
|
|
|
|
+ */
|
|
|
|
+ private void markStoreUnHealthy(DBException dbErr) {
|
|
|
|
+ // Always log the error here, we might not see the error in the caller
|
|
|
|
+ LOG.error("Statestore exception: ", dbErr);
|
|
|
|
+ // We have already been marked unhealthy so no need to do it again.
|
|
|
|
+ if (!isHealthy) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // Mark unhealthy, an out of band heartbeat will be sent and the state
|
|
|
|
+ // will remain unhealthy (not recoverable).
|
|
|
|
+ // No need to close the store: does not make any difference at this point.
|
|
|
|
+ isHealthy = false;
|
|
|
|
+ // We could get here before the nodeStatusUpdater is set
|
|
|
|
+ NodeStatusUpdater nsu = getNodeStatusUpdater();
|
|
|
|
+ if (nsu != null) {
|
|
|
|
+ nsu.reportException(dbErr);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ boolean isHealthy() {
|
|
|
|
+ return isHealthy;
|
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public List<RecoveredContainerState> loadContainersState()
|
|
public List<RecoveredContainerState> loadContainersState()
|
|
@@ -362,6 +396,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
db.write(batch);
|
|
db.write(batch);
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -386,6 +421,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -401,6 +437,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -416,6 +453,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -432,6 +470,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -449,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), bytes(diagnostics.toString()));
|
|
db.put(bytes(key), bytes(diagnostics.toString()));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -467,6 +507,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -496,6 +537,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
batch.close();
|
|
batch.close();
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -512,6 +554,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
db.put(bytes(key), EMPTY_VALUE);
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -528,6 +571,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), bytes(Integer.toString(exitCode)));
|
|
db.put(bytes(key), bytes(Integer.toString(exitCode)));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -540,6 +584,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
|
|
db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -552,6 +597,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), bytes(workDir));
|
|
db.put(bytes(key), bytes(workDir));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -564,6 +610,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), bytes(logDir));
|
|
db.put(bytes(key), bytes(logDir));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -597,6 +644,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
batch.close();
|
|
batch.close();
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -646,6 +694,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), p.toByteArray());
|
|
db.put(bytes(key), p.toByteArray());
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -667,6 +716,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
batch.close();
|
|
batch.close();
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -823,6 +873,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), proto.toByteArray());
|
|
db.put(bytes(key), proto.toByteArray());
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -846,6 +897,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
batch.close();
|
|
batch.close();
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -869,6 +921,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
batch.close();
|
|
batch.close();
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -934,6 +987,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), taskProto.toByteArray());
|
|
db.put(bytes(key), taskProto.toByteArray());
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -944,6 +998,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1017,6 +1072,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1031,6 +1087,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(dbKey), pb.getProto().toByteArray());
|
|
db.put(bytes(dbKey), pb.getProto().toByteArray());
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1104,6 +1161,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), bytes(expTime.toString()));
|
|
db.put(bytes(key), bytes(expTime.toString()));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1115,6 +1173,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1165,6 +1224,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(key), proto.toByteArray());
|
|
db.put(bytes(key), proto.toByteArray());
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1175,6 +1235,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1207,6 +1268,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
batch.close();
|
|
batch.close();
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1370,6 +1432,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(dbkey));
|
|
db.delete(bytes(dbkey));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
return;
|
|
return;
|
|
@@ -1384,6 +1447,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.put(bytes(fullkey), data);
|
|
db.put(bytes(fullkey), data);
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1395,6 +1459,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
try {
|
|
try {
|
|
db.delete(bytes(fullkey));
|
|
db.delete(bytes(fullkey));
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1418,6 +1483,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
candidates.add(key);
|
|
candidates.add(key);
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
} finally {
|
|
} finally {
|
|
if (iter != null) {
|
|
if (iter != null) {
|
|
@@ -1431,6 +1497,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
db.delete(bytes(key));
|
|
db.delete(bytes(key));
|
|
}
|
|
}
|
|
} catch (DBException e) {
|
|
} catch (DBException e) {
|
|
|
|
+ markStoreUnHealthy(e);
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1552,6 +1619,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
return db;
|
|
return db;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ void setDB(DB testDb) {
|
|
|
|
+ this.db = testDb;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
|
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
|
|
* 2) Any incompatible change of state-store is a major upgrade, and any
|
|
* 2) Any incompatible change of state-store is a major upgrade, and any
|