|
@@ -1216,6 +1216,20 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
recentInvalidateSets.put(n.getStorageID(), invalidateSet);
|
|
recentInvalidateSets.put(n.getStorageID(), invalidateSet);
|
|
}
|
|
}
|
|
invalidateSet.add(b);
|
|
invalidateSet.add(b);
|
|
|
|
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
|
|
|
|
+ + b.getBlockName() + " is added to invalidSet of " + n.getName());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds block to list of blocks which will be invalidated on
|
|
|
|
+ * all its datanodes.
|
|
|
|
+ */
|
|
|
|
+ private void addToInvalidates(Block b) {
|
|
|
|
+ for (Iterator<DatanodeDescriptor> it =
|
|
|
|
+ blocksMap.nodeIterator(b); it.hasNext();) {
|
|
|
|
+ DatanodeDescriptor node = it.next();
|
|
|
|
+ addToInvalidates(b, node);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1373,14 +1387,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
for (Block b : deletedBlocks) {
|
|
for (Block b : deletedBlocks) {
|
|
- for (Iterator<DatanodeDescriptor> it =
|
|
|
|
- blocksMap.nodeIterator(b); it.hasNext();) {
|
|
|
|
- DatanodeDescriptor node = it.next();
|
|
|
|
- addToInvalidates(b, node);
|
|
|
|
- NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
|
|
|
|
- + b.getBlockName() + " is added to invalidSet of "
|
|
|
|
- + node.getName());
|
|
|
|
- }
|
|
|
|
|
|
+ addToInvalidates(b);
|
|
}
|
|
}
|
|
if (old.isUnderConstruction()) {
|
|
if (old.isUnderConstruction()) {
|
|
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
|
|
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
|
|
@@ -2574,7 +2581,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
if (fileINode.isUnderConstruction()) {
|
|
if (fileINode.isUnderConstruction()) {
|
|
return block;
|
|
return block;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ // do not handle mis-replicated blocks during startup
|
|
|
|
+ if(isInSafeMode())
|
|
|
|
+ return block;
|
|
|
|
+
|
|
// handle underReplication/overReplication
|
|
// handle underReplication/overReplication
|
|
short fileReplication = fileINode.getReplication();
|
|
short fileReplication = fileINode.getReplication();
|
|
if (numCurrentReplica >= fileReplication) {
|
|
if (numCurrentReplica >= fileReplication) {
|
|
@@ -2588,7 +2599,47 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
}
|
|
}
|
|
return block;
|
|
return block;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * For each block in the name-node verify whether it belongs to any file,
|
|
|
|
+ * over or under replicated. Place it into the respective queue.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void processMisReplicatedBlocks() {
|
|
|
|
+ long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
|
|
|
|
+ neededReplications.clear();
|
|
|
|
+ excessReplicateMap.clear();
|
|
|
|
+ for(BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
|
|
|
|
+ INodeFile fileINode = block.getINode();
|
|
|
|
+ if(fileINode == null) {
|
|
|
|
+ // block does not belong to any file
|
|
|
|
+ nrInvalid++;
|
|
|
|
+ addToInvalidates(block);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ // calculate current replication
|
|
|
|
+ short expectedReplication = fileINode.getReplication();
|
|
|
|
+ NumberReplicas num = countNodes(block);
|
|
|
|
+ int numCurrentReplica = num.liveReplicas();
|
|
|
|
+ // add to under-replicated queue if need to be
|
|
|
|
+ if (neededReplications.add(block,
|
|
|
|
+ numCurrentReplica,
|
|
|
|
+ num.decommissionedReplicas(),
|
|
|
|
+ expectedReplication)) {
|
|
|
|
+ nrUnderReplicated++;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (numCurrentReplica > expectedReplication) {
|
|
|
|
+ // over-replicated block
|
|
|
|
+ nrOverReplicated++;
|
|
|
|
+ proccessOverReplicatedBlock(block, expectedReplication, null, null);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Total number of blocks = " + blocksMap.size());
|
|
|
|
+ LOG.info("Number of invalid blocks = " + nrInvalid);
|
|
|
|
+ LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
|
|
|
|
+ LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Find how many of the containing nodes are "extra", if any.
|
|
* Find how many of the containing nodes are "extra", if any.
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
* If there are any extras, call chooseExcessReplicates() to
|
|
@@ -3335,8 +3386,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
* of blocks in the system, which is the size of
|
|
* of blocks in the system, which is the size of
|
|
* {@link FSNamesystem#blocksMap}. When the ratio reaches the
|
|
* {@link FSNamesystem#blocksMap}. When the ratio reaches the
|
|
* {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
|
|
* {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
|
|
- * to monitor whether the safe mode extension is passed. Then it leaves safe
|
|
|
|
- * mode and destroys itself.
|
|
|
|
|
|
+ * to monitor whether the safe mode {@link #extension} is passed.
|
|
|
|
+ * Then it leaves safe mode and destroys itself.
|
|
* <p>
|
|
* <p>
|
|
* If safe mode is turned on manually then the number of safe blocks is
|
|
* If safe mode is turned on manually then the number of safe blocks is
|
|
* not tracked because the name node is not intended to leave safe mode
|
|
* not tracked because the name node is not intended to leave safe mode
|
|
@@ -3425,9 +3476,12 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Leave safe mode.
|
|
* Leave safe mode.
|
|
- * Switch to manual safe mode if distributed upgrade is required.
|
|
|
|
|
|
+ * <p>
|
|
|
|
+ * Switch to manual safe mode if distributed upgrade is required.<br>
|
|
|
|
+ * Check for invalid, under- & over-replicated blocks in the end of startup.
|
|
*/
|
|
*/
|
|
- synchronized void leave(boolean checkForUpgrades) {
|
|
|
|
|
|
+ synchronized void leave(boolean checkForUpgrades,
|
|
|
|
+ boolean checkBlockReplication) {
|
|
if(checkForUpgrades) {
|
|
if(checkForUpgrades) {
|
|
// verify whether a distributed upgrade needs to be started
|
|
// verify whether a distributed upgrade needs to be started
|
|
boolean needUpgrade = false;
|
|
boolean needUpgrade = false;
|
|
@@ -3442,6 +3496,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ if(checkBlockReplication)
|
|
|
|
+ // verify blocks replications
|
|
|
|
+ processMisReplicatedBlocks();
|
|
long timeInSafemode = now() - systemStart;
|
|
long timeInSafemode = now() - systemStart;
|
|
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
|
|
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
|
|
+ timeInSafemode/1000 + " secs.");
|
|
+ timeInSafemode/1000 + " secs.");
|
|
@@ -3503,7 +3560,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
// the threshold is reached
|
|
// the threshold is reached
|
|
if (!isOn() || // safe mode is off
|
|
if (!isOn() || // safe mode is off
|
|
extension <= 0 || threshold <= 0) { // don't need to wait
|
|
extension <= 0 || threshold <= 0) { // don't need to wait
|
|
- this.leave(true); // leave safe mode
|
|
|
|
|
|
+ this.leave(true, false); // leave safe mode
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
if (reached > 0) { // threshold has already been reached before
|
|
if (reached > 0) { // threshold has already been reached before
|
|
@@ -3639,8 +3696,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // leave safe mode an stop the monitor
|
|
|
|
- safeMode.leave(true);
|
|
|
|
|
|
+ // leave safe mode and stop the monitor
|
|
|
|
+ if(safeMode != null)
|
|
|
|
+ safeMode.leave(true, true);
|
|
smmthread = null;
|
|
smmthread = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -3658,7 +3716,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
checkSuperuserPrivilege();
|
|
checkSuperuserPrivilege();
|
|
switch(action) {
|
|
switch(action) {
|
|
case SAFEMODE_LEAVE: // leave safe mode
|
|
case SAFEMODE_LEAVE: // leave safe mode
|
|
- leaveSafeMode(false);
|
|
|
|
|
|
+ leaveSafeMode(false, false);
|
|
break;
|
|
break;
|
|
case SAFEMODE_ENTER: // enter safe mode
|
|
case SAFEMODE_ENTER: // enter safe mode
|
|
enterSafeMode();
|
|
enterSafeMode();
|
|
@@ -3729,7 +3787,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
* Leave safe mode.
|
|
* Leave safe mode.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- synchronized void leaveSafeMode(boolean checkForUpgrades) throws IOException {
|
|
|
|
|
|
+ synchronized void leaveSafeMode(boolean checkForUpgrades,
|
|
|
|
+ boolean checkBlockReplication
|
|
|
|
+ ) throws IOException {
|
|
if (!isInSafeMode()) {
|
|
if (!isInSafeMode()) {
|
|
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
|
|
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
|
|
return;
|
|
return;
|
|
@@ -3737,7 +3797,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
|
|
if(getDistributedUpgradeState())
|
|
if(getDistributedUpgradeState())
|
|
throw new SafeModeException("Distributed upgrade is in progress",
|
|
throw new SafeModeException("Distributed upgrade is in progress",
|
|
safeMode);
|
|
safeMode);
|
|
- safeMode.leave(checkForUpgrades);
|
|
|
|
|
|
+ safeMode.leave(checkForUpgrades, checkBlockReplication);
|
|
}
|
|
}
|
|
|
|
|
|
String getSafeModeTip() {
|
|
String getSafeModeTip() {
|