|
@@ -678,15 +678,20 @@ class BPOfferService {
|
|
|
actor.reRegister();
|
|
|
return false;
|
|
|
}
|
|
|
- writeLock();
|
|
|
+ boolean isActiveActor;
|
|
|
+ InetSocketAddress nnSocketAddress;
|
|
|
+ readLock();
|
|
|
try {
|
|
|
- if (actor == bpServiceToActive) {
|
|
|
- return processCommandFromActive(cmd, actor);
|
|
|
- } else {
|
|
|
- return processCommandFromStandby(cmd, actor);
|
|
|
- }
|
|
|
+ isActiveActor = (actor == bpServiceToActive);
|
|
|
+ nnSocketAddress = actor.getNNSocketAddress();
|
|
|
} finally {
|
|
|
- writeUnlock();
|
|
|
+ readUnlock();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isActiveActor) {
|
|
|
+ return processCommandFromActive(cmd, nnSocketAddress);
|
|
|
+ } else {
|
|
|
+ return processCommandFromStandby(cmd, nnSocketAddress);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -714,7 +719,7 @@ class BPOfferService {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private boolean processCommandFromActive(DatanodeCommand cmd,
|
|
|
- BPServiceActor actor) throws IOException {
|
|
|
+ InetSocketAddress nnSocketAddress) throws IOException {
|
|
|
final BlockCommand bcmd =
|
|
|
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
|
|
|
final BlockIdCommand blockIdCmd =
|
|
@@ -768,7 +773,7 @@ class BPOfferService {
|
|
|
dn.finalizeUpgradeForPool(bp);
|
|
|
break;
|
|
|
case DatanodeProtocol.DNA_RECOVERBLOCK:
|
|
|
- String who = "NameNode at " + actor.getNNSocketAddress();
|
|
|
+ String who = "NameNode at " + nnSocketAddress;
|
|
|
dn.getBlockRecoveryWorker().recoverBlocks(who,
|
|
|
((BlockRecoveryCommand)cmd).getRecoveringBlocks());
|
|
|
break;
|
|
@@ -810,10 +815,11 @@ class BPOfferService {
|
|
|
* DNA_REGISTER which should be handled earlier itself.
|
|
|
*/
|
|
|
private boolean processCommandFromStandby(DatanodeCommand cmd,
|
|
|
- BPServiceActor actor) throws IOException {
|
|
|
+ InetSocketAddress nnSocketAddress) throws IOException {
|
|
|
switch(cmd.getAction()) {
|
|
|
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
|
|
|
- LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
|
|
|
+ LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE",
|
|
|
+ nnSocketAddress);
|
|
|
if (dn.isBlockTokenEnabled) {
|
|
|
dn.blockPoolTokenSecretManager.addKeys(
|
|
|
getBlockPoolId(),
|
|
@@ -829,10 +835,12 @@ class BPOfferService {
|
|
|
case DatanodeProtocol.DNA_CACHE:
|
|
|
case DatanodeProtocol.DNA_UNCACHE:
|
|
|
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
|
|
|
- LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
|
|
|
+ LOG.warn("Got a command from standby NN {} - ignoring command: {}",
|
|
|
+ nnSocketAddress, cmd.getAction());
|
|
|
break;
|
|
|
default:
|
|
|
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
|
|
|
+ LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}",
|
|
|
+ cmd.getAction(), nnSocketAddress);
|
|
|
}
|
|
|
return true;
|
|
|
}
|