|
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.google.common.collect.Sets;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
@@ -38,6 +39,8 @@ import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
+import java.util.concurrent.locks.Lock;
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
/**
|
|
|
* One instance per block-pool/namespace on the DN, which handles the
|
|
@@ -91,6 +94,28 @@ class BPOfferService {
|
|
|
*/
|
|
|
private long lastActiveClaimTxId = -1;
|
|
|
|
|
|
+ private final ReentrantReadWriteLock mReadWriteLock =
|
|
|
+ new ReentrantReadWriteLock();
|
|
|
+ private final Lock mReadLock = mReadWriteLock.readLock();
|
|
|
+ private final Lock mWriteLock = mReadWriteLock.writeLock();
|
|
|
+
|
|
|
+ // utility methods to acquire and release read lock and write lock
|
|
|
+ void readLock() {
|
|
|
+ mReadLock.lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ void readUnlock() {
|
|
|
+ mReadLock.unlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeLock() {
|
|
|
+ mWriteLock.lock();
|
|
|
+ }
|
|
|
+
|
|
|
+ void writeUnlock() {
|
|
|
+ mWriteLock.unlock();
|
|
|
+ }
|
|
|
+
|
|
|
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
|
|
|
Preconditions.checkArgument(!nnAddrs.isEmpty(),
|
|
|
"Must pass at least one NN.");
|
|
@@ -135,14 +160,19 @@ class BPOfferService {
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
- synchronized String getBlockPoolId() {
|
|
|
- if (bpNSInfo != null) {
|
|
|
- return bpNSInfo.getBlockPoolID();
|
|
|
- } else {
|
|
|
- LOG.warn("Block pool ID needed, but service not yet registered with NN",
|
|
|
- new Exception("trace"));
|
|
|
- return null;
|
|
|
+
|
|
|
+ String getBlockPoolId() {
|
|
|
+ readLock();
|
|
|
+ try {
|
|
|
+ if (bpNSInfo != null) {
|
|
|
+ return bpNSInfo.getBlockPoolID();
|
|
|
+ } else {
|
|
|
+ LOG.warn("Block pool ID needed, but service not yet registered with NN",
|
|
|
+ new Exception("trace"));
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ readUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -150,27 +180,37 @@ class BPOfferService {
|
|
|
return getNamespaceInfo() != null;
|
|
|
}
|
|
|
|
|
|
- synchronized NamespaceInfo getNamespaceInfo() {
|
|
|
- return bpNSInfo;
|
|
|
+ NamespaceInfo getNamespaceInfo() {
|
|
|
+ readLock();
|
|
|
+ try {
|
|
|
+ return bpNSInfo;
|
|
|
+ } finally {
|
|
|
+ readUnlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized String toString() {
|
|
|
- if (bpNSInfo == null) {
|
|
|
- // If we haven't yet connected to our NN, we don't yet know our
|
|
|
- // own block pool ID.
|
|
|
- // If _none_ of the block pools have connected yet, we don't even
|
|
|
- // know the DatanodeID ID of this DN.
|
|
|
- String datanodeUuid = dn.getDatanodeUuid();
|
|
|
-
|
|
|
- if (datanodeUuid == null || datanodeUuid.isEmpty()) {
|
|
|
- datanodeUuid = "unassigned";
|
|
|
+ public String toString() {
|
|
|
+ readLock();
|
|
|
+ try {
|
|
|
+ if (bpNSInfo == null) {
|
|
|
+ // If we haven't yet connected to our NN, we don't yet know our
|
|
|
+ // own block pool ID.
|
|
|
+ // If _none_ of the block pools have connected yet, we don't even
|
|
|
+ // know the DatanodeID ID of this DN.
|
|
|
+ String datanodeUuid = dn.getDatanodeUuid();
|
|
|
+
|
|
|
+ if (datanodeUuid == null || datanodeUuid.isEmpty()) {
|
|
|
+ datanodeUuid = "unassigned";
|
|
|
+ }
|
|
|
+ return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
|
|
|
+ } else {
|
|
|
+ return "Block pool " + getBlockPoolId() +
|
|
|
+ " (Datanode Uuid " + dn.getDatanodeUuid() +
|
|
|
+ ")";
|
|
|
}
|
|
|
- return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
|
|
|
- } else {
|
|
|
- return "Block pool " + getBlockPoolId() +
|
|
|
- " (Datanode Uuid " + dn.getDatanodeUuid() +
|
|
|
- ")";
|
|
|
+ } finally {
|
|
|
+ readUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -266,32 +306,37 @@ class BPOfferService {
|
|
|
* verifies that this namespace matches (eg to prevent a misconfiguration
|
|
|
* where a StandbyNode from a different cluster is specified)
|
|
|
*/
|
|
|
- synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
|
|
|
- if (this.bpNSInfo == null) {
|
|
|
- this.bpNSInfo = nsInfo;
|
|
|
- boolean success = false;
|
|
|
-
|
|
|
- // Now that we know the namespace ID, etc, we can pass this to the DN.
|
|
|
- // The DN can now initialize its local storage if we are the
|
|
|
- // first BP to handshake, etc.
|
|
|
- try {
|
|
|
- dn.initBlockPool(this);
|
|
|
- success = true;
|
|
|
- } finally {
|
|
|
- if (!success) {
|
|
|
- // The datanode failed to initialize the BP. We need to reset
|
|
|
- // the namespace info so that other BPService actors still have
|
|
|
- // a chance to set it, and re-initialize the datanode.
|
|
|
- this.bpNSInfo = null;
|
|
|
+ void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
|
|
|
+ writeLock();
|
|
|
+ try {
|
|
|
+ if (this.bpNSInfo == null) {
|
|
|
+ this.bpNSInfo = nsInfo;
|
|
|
+ boolean success = false;
|
|
|
+
|
|
|
+ // Now that we know the namespace ID, etc, we can pass this to the DN.
|
|
|
+ // The DN can now initialize its local storage if we are the
|
|
|
+ // first BP to handshake, etc.
|
|
|
+ try {
|
|
|
+ dn.initBlockPool(this);
|
|
|
+ success = true;
|
|
|
+ } finally {
|
|
|
+ if (!success) {
|
|
|
+ // The datanode failed to initialize the BP. We need to reset
|
|
|
+ // the namespace info so that other BPService actors still have
|
|
|
+ // a chance to set it, and re-initialize the datanode.
|
|
|
+ this.bpNSInfo = null;
|
|
|
+ }
|
|
|
}
|
|
|
+ } else {
|
|
|
+ checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
|
|
|
+ "Blockpool ID");
|
|
|
+ checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
|
|
|
+ "Namespace ID");
|
|
|
+ checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
|
|
|
+ "Cluster ID");
|
|
|
}
|
|
|
- } else {
|
|
|
- checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
|
|
|
- "Blockpool ID");
|
|
|
- checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
|
|
|
- "Namespace ID");
|
|
|
- checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
|
|
|
- "Cluster ID");
|
|
|
+ } finally {
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -300,22 +345,27 @@ class BPOfferService {
|
|
|
* NN, it calls this function to verify that the NN it connected to
|
|
|
* is consistent with other NNs serving the block-pool.
|
|
|
*/
|
|
|
- synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
|
|
|
+ void registrationSucceeded(BPServiceActor bpServiceActor,
|
|
|
DatanodeRegistration reg) throws IOException {
|
|
|
- if (bpRegistration != null) {
|
|
|
- checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
|
|
|
- reg.getStorageInfo().getNamespaceID(), "namespace ID");
|
|
|
- checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
|
|
|
- reg.getStorageInfo().getClusterID(), "cluster ID");
|
|
|
- } else {
|
|
|
- bpRegistration = reg;
|
|
|
- }
|
|
|
-
|
|
|
- dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
|
|
|
- // Add the initial block token secret keys to the DN's secret manager.
|
|
|
- if (dn.isBlockTokenEnabled) {
|
|
|
- dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
|
|
|
- reg.getExportedKeys());
|
|
|
+ writeLock();
|
|
|
+ try {
|
|
|
+ if (bpRegistration != null) {
|
|
|
+ checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
|
|
|
+ reg.getStorageInfo().getNamespaceID(), "namespace ID");
|
|
|
+ checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
|
|
|
+ reg.getStorageInfo().getClusterID(), "cluster ID");
|
|
|
+ } else {
|
|
|
+ bpRegistration = reg;
|
|
|
+ }
|
|
|
+
|
|
|
+ dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
|
|
|
+ // Add the initial block token secret keys to the DN's secret manager.
|
|
|
+ if (dn.isBlockTokenEnabled) {
|
|
|
+ dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
|
|
|
+ reg.getExportedKeys());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -333,25 +383,35 @@ class BPOfferService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized DatanodeRegistration createRegistration() {
|
|
|
- Preconditions.checkState(bpNSInfo != null,
|
|
|
- "getRegistration() can only be called after initial handshake");
|
|
|
- return dn.createBPRegistration(bpNSInfo);
|
|
|
+ DatanodeRegistration createRegistration() {
|
|
|
+ writeLock();
|
|
|
+ try {
|
|
|
+ Preconditions.checkState(bpNSInfo != null,
|
|
|
+ "getRegistration() can only be called after initial handshake");
|
|
|
+ return dn.createBPRegistration(bpNSInfo);
|
|
|
+ } finally {
|
|
|
+ writeUnlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Called when an actor shuts down. If this is the last actor
|
|
|
* to shut down, shuts down the whole blockpool in the DN.
|
|
|
*/
|
|
|
- synchronized void shutdownActor(BPServiceActor actor) {
|
|
|
- if (bpServiceToActive == actor) {
|
|
|
- bpServiceToActive = null;
|
|
|
- }
|
|
|
+ void shutdownActor(BPServiceActor actor) {
|
|
|
+ writeLock();
|
|
|
+ try {
|
|
|
+ if (bpServiceToActive == actor) {
|
|
|
+ bpServiceToActive = null;
|
|
|
+ }
|
|
|
|
|
|
- bpServices.remove(actor);
|
|
|
+ bpServices.remove(actor);
|
|
|
|
|
|
- if (bpServices.isEmpty()) {
|
|
|
- dn.shutdownBlockPool(this);
|
|
|
+ if (bpServices.isEmpty()) {
|
|
|
+ dn.shutdownBlockPool(this);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -392,11 +452,16 @@ class BPOfferService {
|
|
|
* @return a proxy to the active NN, or null if the BPOS has not
|
|
|
* acknowledged any NN as active yet.
|
|
|
*/
|
|
|
- synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
|
|
|
- if (bpServiceToActive != null) {
|
|
|
- return bpServiceToActive.bpNamenode;
|
|
|
- } else {
|
|
|
- return null;
|
|
|
+ DatanodeProtocolClientSideTranslatorPB getActiveNN() {
|
|
|
+ readLock();
|
|
|
+ try {
|
|
|
+ if (bpServiceToActive != null) {
|
|
|
+ return bpServiceToActive.bpNamenode;
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ readUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -424,45 +489,50 @@ class BPOfferService {
|
|
|
* @param actor the actor which received the heartbeat
|
|
|
* @param nnHaState the HA-related heartbeat contents
|
|
|
*/
|
|
|
- synchronized void updateActorStatesFromHeartbeat(
|
|
|
+ void updateActorStatesFromHeartbeat(
|
|
|
BPServiceActor actor,
|
|
|
NNHAStatusHeartbeat nnHaState) {
|
|
|
- final long txid = nnHaState.getTxId();
|
|
|
-
|
|
|
- final boolean nnClaimsActive =
|
|
|
- nnHaState.getState() == HAServiceState.ACTIVE;
|
|
|
- final boolean bposThinksActive = bpServiceToActive == actor;
|
|
|
- final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
|
|
|
-
|
|
|
- if (nnClaimsActive && !bposThinksActive) {
|
|
|
- LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
|
|
|
- "txid=" + txid);
|
|
|
- if (!isMoreRecentClaim) {
|
|
|
- // Split-brain scenario - an NN is trying to claim active
|
|
|
- // state when a different NN has already claimed it with a higher
|
|
|
- // txid.
|
|
|
- LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
|
|
|
- txid + " but there was already a more recent claim at txid=" +
|
|
|
- lastActiveClaimTxId);
|
|
|
- return;
|
|
|
- } else {
|
|
|
- if (bpServiceToActive == null) {
|
|
|
- LOG.info("Acknowledging ACTIVE Namenode " + actor);
|
|
|
+ writeLock();
|
|
|
+ try {
|
|
|
+ final long txid = nnHaState.getTxId();
|
|
|
+
|
|
|
+ final boolean nnClaimsActive =
|
|
|
+ nnHaState.getState() == HAServiceState.ACTIVE;
|
|
|
+ final boolean bposThinksActive = bpServiceToActive == actor;
|
|
|
+ final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
|
|
|
+
|
|
|
+ if (nnClaimsActive && !bposThinksActive) {
|
|
|
+ LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
|
|
|
+ "txid=" + txid);
|
|
|
+ if (!isMoreRecentClaim) {
|
|
|
+ // Split-brain scenario - an NN is trying to claim active
|
|
|
+ // state when a different NN has already claimed it with a higher
|
|
|
+ // txid.
|
|
|
+ LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
|
|
|
+ txid + " but there was already a more recent claim at txid=" +
|
|
|
+ lastActiveClaimTxId);
|
|
|
+ return;
|
|
|
} else {
|
|
|
- LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
|
|
|
- bpServiceToActive + " at higher txid=" + txid);
|
|
|
+ if (bpServiceToActive == null) {
|
|
|
+ LOG.info("Acknowledging ACTIVE Namenode " + actor);
|
|
|
+ } else {
|
|
|
+ LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
|
|
|
+ bpServiceToActive + " at higher txid=" + txid);
|
|
|
+ }
|
|
|
+ bpServiceToActive = actor;
|
|
|
}
|
|
|
- bpServiceToActive = actor;
|
|
|
+ } else if (!nnClaimsActive && bposThinksActive) {
|
|
|
+ LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
|
|
|
+ "txid=" + nnHaState.getTxId());
|
|
|
+ bpServiceToActive = null;
|
|
|
}
|
|
|
- } else if (!nnClaimsActive && bposThinksActive) {
|
|
|
- LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
|
|
|
- "txid=" + nnHaState.getTxId());
|
|
|
- bpServiceToActive = null;
|
|
|
- }
|
|
|
-
|
|
|
- if (bpServiceToActive == actor) {
|
|
|
- assert txid >= lastActiveClaimTxId;
|
|
|
- lastActiveClaimTxId = txid;
|
|
|
+
|
|
|
+ if (bpServiceToActive == actor) {
|
|
|
+ assert txid >= lastActiveClaimTxId;
|
|
|
+ lastActiveClaimTxId = txid;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -533,12 +603,15 @@ class BPOfferService {
|
|
|
actor.reRegister();
|
|
|
return true;
|
|
|
}
|
|
|
- synchronized (this) {
|
|
|
+ writeLock();
|
|
|
+ try {
|
|
|
if (actor == bpServiceToActive) {
|
|
|
return processCommandFromActive(cmd, actor);
|
|
|
} else {
|
|
|
return processCommandFromStandby(cmd, actor);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ writeUnlock();
|
|
|
}
|
|
|
}
|
|
|
|