|
@@ -25,8 +25,9 @@ import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.NavigableSet;
|
|
|
+import java.util.NoSuchElementException;
|
|
|
import java.util.SortedMap;
|
|
|
-import java.util.SortedSet;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
@@ -36,6 +37,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
|
|
@@ -79,7 +81,7 @@ public class LeaseManager {
|
|
|
//
|
|
|
private final SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
|
|
|
// Set of: Lease
|
|
|
- private final SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
|
|
|
+ private final NavigableSet<Lease> sortedLeases = new TreeSet<Lease>();
|
|
|
|
|
|
//
|
|
|
// Map path names to leases. It is protected by the sortedLeases lock.
|
|
@@ -95,8 +97,41 @@ public class LeaseManager {
|
|
|
Lease getLease(String holder) {
|
|
|
return leases.get(holder);
|
|
|
}
|
|
|
-
|
|
|
- SortedSet<Lease> getSortedLeases() {return sortedLeases;}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ int getNumSortedLeases() {return sortedLeases.size();}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This method iterates through all the leases and counts the number of blocks
|
|
|
+ * which are not COMPLETE. The FSNamesystem read lock MUST be held before
|
|
|
+ * calling this method.
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ synchronized long getNumUnderConstructionBlocks() {
|
|
|
+ assert this.fsnamesystem.hasReadLock() : "The FSNamesystem read lock wasn't"
|
|
|
+ + "acquired before counting under construction blocks";
|
|
|
+ long numUCBlocks = 0;
|
|
|
+ for (Lease lease : sortedLeases) {
|
|
|
+ for (String path : lease.getPaths()) {
|
|
|
+ final INodeFile cons;
|
|
|
+ try {
|
|
|
+ cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
|
|
|
+ Preconditions.checkState(cons.isUnderConstruction());
|
|
|
+ } catch (UnresolvedLinkException e) {
|
|
|
+ throw new AssertionError("Lease files should reside on this FS");
|
|
|
+ }
|
|
|
+ BlockInfo[] blocks = cons.getBlocks();
|
|
|
+ if(blocks == null)
|
|
|
+ continue;
|
|
|
+ for(BlockInfo b : blocks) {
|
|
|
+ if(!b.isComplete())
|
|
|
+ numUCBlocks++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.info("Number of blocks under construction: " + numUCBlocks);
|
|
|
+ return numUCBlocks;
|
|
|
+ }
|
|
|
|
|
|
/** @return the lease containing src */
|
|
|
public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
|
|
@@ -421,33 +456,38 @@ public class LeaseManager {
|
|
|
/** Check the leases beginning from the oldest.
|
|
|
* @return true is sync is needed.
|
|
|
*/
|
|
|
- private synchronized boolean checkLeases() {
|
|
|
+ @VisibleForTesting
|
|
|
+ synchronized boolean checkLeases() {
|
|
|
boolean needSync = false;
|
|
|
assert fsnamesystem.hasWriteLock();
|
|
|
- for(; sortedLeases.size() > 0; ) {
|
|
|
- final Lease oldest = sortedLeases.first();
|
|
|
- if (!oldest.expiredHardLimit()) {
|
|
|
- return needSync;
|
|
|
+ Lease leaseToCheck = null;
|
|
|
+ try {
|
|
|
+ leaseToCheck = sortedLeases.first();
|
|
|
+ } catch(NoSuchElementException e) {}
|
|
|
+
|
|
|
+ while(leaseToCheck != null) {
|
|
|
+ if (!leaseToCheck.expiredHardLimit()) {
|
|
|
+ break;
|
|
|
}
|
|
|
|
|
|
- LOG.info(oldest + " has expired hard limit");
|
|
|
+ LOG.info(leaseToCheck + " has expired hard limit");
|
|
|
|
|
|
final List<String> removing = new ArrayList<String>();
|
|
|
- // need to create a copy of the oldest lease paths, becuase
|
|
|
+ // need to create a copy of the oldest lease paths, because
|
|
|
// internalReleaseLease() removes paths corresponding to empty files,
|
|
|
// i.e. it needs to modify the collection being iterated over
|
|
|
// causing ConcurrentModificationException
|
|
|
- String[] leasePaths = new String[oldest.getPaths().size()];
|
|
|
- oldest.getPaths().toArray(leasePaths);
|
|
|
+ String[] leasePaths = new String[leaseToCheck.getPaths().size()];
|
|
|
+ leaseToCheck.getPaths().toArray(leasePaths);
|
|
|
for(String p : leasePaths) {
|
|
|
try {
|
|
|
- boolean completed = fsnamesystem.internalReleaseLease(oldest, p,
|
|
|
+ boolean completed = fsnamesystem.internalReleaseLease(leaseToCheck, p,
|
|
|
HdfsServerConstants.NAMENODE_LEASE_HOLDER);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
if (completed) {
|
|
|
LOG.debug("Lease recovery for " + p + " is complete. File closed.");
|
|
|
} else {
|
|
|
- LOG.debug("Started block recovery " + p + " lease " + oldest);
|
|
|
+ LOG.debug("Started block recovery " + p + " lease " + leaseToCheck);
|
|
|
}
|
|
|
}
|
|
|
// If a lease recovery happened, we need to sync later.
|
|
@@ -456,15 +496,23 @@ public class LeaseManager {
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Cannot release the path " + p + " in the lease "
|
|
|
- + oldest, e);
|
|
|
+ + leaseToCheck, e);
|
|
|
removing.add(p);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for(String p : removing) {
|
|
|
- removeLease(oldest, p);
|
|
|
+ removeLease(leaseToCheck, p);
|
|
|
}
|
|
|
+ leaseToCheck = sortedLeases.higher(leaseToCheck);
|
|
|
}
|
|
|
+
|
|
|
+ try {
|
|
|
+ if(leaseToCheck != sortedLeases.first()) {
|
|
|
+ LOG.warn("Unable to release hard-limit expired lease: "
|
|
|
+ + sortedLeases.first());
|
|
|
+ }
|
|
|
+ } catch(NoSuchElementException e) {}
|
|
|
return needSync;
|
|
|
}
|
|
|
|