|
@@ -103,6 +103,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
@@ -1324,20 +1325,28 @@ public class DataNode extends Configured
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // shutdown BPOS thread TODO:FEDERATION - review if this is enough
|
|
|
- if (nameNodeThreads != null) {
|
|
|
- for(BPOfferService bpos : nameNodeThreads) {
|
|
|
- if(bpos != null && bpos.bpThread!=null) {
|
|
|
- try {
|
|
|
- bpos.bpThread.interrupt();
|
|
|
- bpos.bpThread.join();
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
- }
|
|
|
+ // interrupt all the threads, let them discover that shouldRun is false now
|
|
|
+ for(BPOfferService bpos : nameNodeThreads) {
|
|
|
+ if(bpos != null && bpos.bpThread!=null) {
|
|
|
+ bpos.bpThread.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // wait until the bp threads are done.
|
|
|
+ for(BPOfferService bpos : nameNodeThreads) {
|
|
|
+ if(bpos != null && bpos.bpThread!=null) {
|
|
|
+ try {
|
|
|
+ bpos.bpThread.join();
|
|
|
+ } catch (InterruptedException ignored) {}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- RPC.stopProxy(namenode); // stop the RPC threads
|
|
|
-
|
|
|
+
|
|
|
+ // stop all the proxy threads
|
|
|
+ for(BPOfferService bpos: nameNodeThreads) {
|
|
|
+ if(bpos!=null && bpos.bpNamenode!=null)
|
|
|
+ RPC.stopProxy(bpos.bpNamenode); // stop the RPC threads
|
|
|
+ }
|
|
|
+
|
|
|
if(upgradeManager != null)
|
|
|
upgradeManager.shutdownUpgrade();
|
|
|
if (blockScannerThread != null) {
|
|
@@ -1401,14 +1410,14 @@ public class DataNode extends Configured
|
|
|
// DN will be shutdown and NN should remove it
|
|
|
dp_error = DatanodeProtocol.FATAL_DISK_ERROR;
|
|
|
}
|
|
|
- //inform NameNode
|
|
|
- try {
|
|
|
- namenode.errorReport(
|
|
|
- dnRegistration, dp_error, errMsgr);
|
|
|
- } catch(IOException ignored) {
|
|
|
+ //inform NameNodes
|
|
|
+ for(BPOfferService bpos: nameNodeThreads) {
|
|
|
+ DatanodeProtocol nn = bpos.bpNamenode;
|
|
|
+ try {
|
|
|
+ nn.errorReport(bpos.bpRegistration, dp_error, errMsgr);
|
|
|
+ } catch(IOException ignored) { }
|
|
|
}
|
|
|
|
|
|
-
|
|
|
if(hasEnoughResource) {
|
|
|
scheduleAllBlockReport(0);
|
|
|
return; // do not shutdown
|