|
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Comparator;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.NavigableMap;
|
|
@@ -55,13 +56,6 @@ import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
|
|
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
|
@@ -71,7 +65,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
|
|
import org.apache.hadoop.hdfs.util.CyclicIteration;
|
|
|
-import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
@@ -172,6 +165,14 @@ public class DatanodeManager {
|
|
|
* according to the NetworkTopology.
|
|
|
*/
|
|
|
private boolean hasClusterEverBeenMultiRack = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The number of datanodes for each software version. This list should change
|
|
|
+ * during rolling upgrades.
|
|
|
+ * Software version -> Number of datanodes with this version
|
|
|
+ */
|
|
|
+ private HashMap<String, Integer> datanodesSoftwareVersions =
|
|
|
+ new HashMap<String, Integer>(4, 0.75f);
|
|
|
|
|
|
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
|
|
|
final Configuration conf) throws IOException {
|
|
@@ -463,6 +464,7 @@ public class DatanodeManager {
|
|
|
heartbeatManager.removeDatanode(nodeInfo);
|
|
|
blockManager.removeBlocksAssociatedTo(nodeInfo);
|
|
|
networktopology.remove(nodeInfo);
|
|
|
+ decrementVersionCount(nodeInfo.getSoftwareVersion());
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("remove datanode " + nodeInfo);
|
|
@@ -545,6 +547,61 @@ public class DatanodeManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void incrementVersionCount(String version) {
|
|
|
+ if (version == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ synchronized(datanodeMap) {
|
|
|
+ Integer count = this.datanodesSoftwareVersions.get(version);
|
|
|
+ count = count == null ? 1 : count + 1;
|
|
|
+ this.datanodesSoftwareVersions.put(version, count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void decrementVersionCount(String version) {
|
|
|
+ if (version == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ synchronized(datanodeMap) {
|
|
|
+ Integer count = this.datanodesSoftwareVersions.get(version);
|
|
|
+ if(count != null) {
|
|
|
+ if(count > 1) {
|
|
|
+ this.datanodesSoftwareVersions.put(version, count-1);
|
|
|
+ } else {
|
|
|
+ this.datanodesSoftwareVersions.remove(version);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean shouldCountVersion(DatanodeDescriptor node) {
|
|
|
+ return node.getSoftwareVersion() != null && node.isAlive &&
|
|
|
+ !isDatanodeDead(node);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void countSoftwareVersions() {
|
|
|
+ synchronized(datanodeMap) {
|
|
|
+ HashMap<String, Integer> versionCount = new HashMap<String, Integer>();
|
|
|
+ for(DatanodeDescriptor dn: datanodeMap.values()) {
|
|
|
+ // Check isAlive too because right after removeDatanode(),
|
|
|
+ // isDatanodeDead() is still true
|
|
|
+ if(shouldCountVersion(dn))
|
|
|
+ {
|
|
|
+ Integer num = versionCount.get(dn.getSoftwareVersion());
|
|
|
+ num = num == null ? 1 : num+1;
|
|
|
+ versionCount.put(dn.getSoftwareVersion(), num);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ this.datanodesSoftwareVersions = versionCount;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public HashMap<String, Integer> getDatanodesSoftwareVersions() {
|
|
|
+ synchronized(datanodeMap) {
|
|
|
+ return new HashMap<String, Integer> (this.datanodesSoftwareVersions);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/* Resolve a node's network location */
|
|
|
private String resolveNetworkLocation (DatanodeID node) {
|
|
|
List<String> names = new ArrayList<String>(1);
|
|
@@ -761,21 +818,28 @@ public class DatanodeManager {
|
|
|
try {
|
|
|
// update cluster map
|
|
|
getNetworkTopology().remove(nodeS);
|
|
|
+ if(shouldCountVersion(nodeS)) {
|
|
|
+ decrementVersionCount(nodeS.getSoftwareVersion());
|
|
|
+ }
|
|
|
nodeS.updateRegInfo(nodeReg);
|
|
|
+
|
|
|
+ nodeS.setSoftwareVersion(nodeReg.getSoftwareVersion());
|
|
|
nodeS.setDisallowed(false); // Node is in the include list
|
|
|
-
|
|
|
+
|
|
|
// resolve network location
|
|
|
nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
|
|
|
getNetworkTopology().add(nodeS);
|
|
|
|
|
|
// also treat the registration message as a heartbeat
|
|
|
heartbeatManager.register(nodeS);
|
|
|
+ incrementVersionCount(nodeS.getSoftwareVersion());
|
|
|
checkDecommissioning(nodeS);
|
|
|
success = true;
|
|
|
} finally {
|
|
|
if (!success) {
|
|
|
removeDatanode(nodeS);
|
|
|
wipeDatanode(nodeS);
|
|
|
+ countSoftwareVersions();
|
|
|
}
|
|
|
}
|
|
|
return;
|
|
@@ -799,6 +863,7 @@ public class DatanodeManager {
|
|
|
try {
|
|
|
nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
|
|
|
networktopology.add(nodeDescr);
|
|
|
+ nodeDescr.setSoftwareVersion(nodeReg.getSoftwareVersion());
|
|
|
|
|
|
// register new datanode
|
|
|
addDatanode(nodeDescr);
|
|
@@ -809,10 +874,12 @@ public class DatanodeManager {
|
|
|
// because its is done when the descriptor is created
|
|
|
heartbeatManager.addDatanode(nodeDescr);
|
|
|
success = true;
|
|
|
+ incrementVersionCount(nodeReg.getSoftwareVersion());
|
|
|
} finally {
|
|
|
if (!success) {
|
|
|
removeDatanode(nodeDescr);
|
|
|
wipeDatanode(nodeDescr);
|
|
|
+ countSoftwareVersions();
|
|
|
}
|
|
|
}
|
|
|
} catch (InvalidTopologyException e) {
|
|
@@ -834,6 +901,7 @@ public class DatanodeManager {
|
|
|
namesystem.writeLock();
|
|
|
try {
|
|
|
refreshDatanodes();
|
|
|
+ countSoftwareVersions();
|
|
|
} finally {
|
|
|
namesystem.writeUnlock();
|
|
|
}
|