|
@@ -22,20 +22,17 @@ import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.NavigableSet;
|
|
|
-import java.util.NoSuchElementException;
|
|
|
+import java.util.PriorityQueue;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.TreeSet;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
@@ -78,15 +75,17 @@ public class LeaseManager {
|
|
|
// Used for handling lock-leases
|
|
|
// Mapping: leaseHolder -> Lease
|
|
|
//
|
|
|
- private final SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
|
|
|
+ private final SortedMap<String, Lease> leases = new TreeMap<>();
|
|
|
// Set of: Lease
|
|
|
- private final NavigableSet<Lease> sortedLeases = new TreeSet<Lease>();
|
|
|
-
|
|
|
- //
|
|
|
- // Map path names to leases. It is protected by the sortedLeases lock.
|
|
|
- // The map stores pathnames in lexicographical order.
|
|
|
- //
|
|
|
- private final SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
|
|
|
+ private final PriorityQueue<Lease> sortedLeases = new PriorityQueue<>(512,
|
|
|
+ new Comparator<Lease>() {
|
|
|
+ @Override
|
|
|
+ public int compare(Lease o1, Lease o2) {
|
|
|
+ return Long.signum(o1.getLastUpdate() - o2.getLastUpdate());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // INodeID -> Lease
|
|
|
+ private final HashMap<Long, Lease> leasesById = new HashMap<>();
|
|
|
|
|
|
private Daemon lmthread;
|
|
|
private volatile boolean shouldRunMonitor;
|
|
@@ -97,60 +96,44 @@ public class LeaseManager {
|
|
|
return leases.get(holder);
|
|
|
}
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
- int getNumSortedLeases() {return sortedLeases.size();}
|
|
|
-
|
|
|
/**
|
|
|
* This method iterates through all the leases and counts the number of blocks
|
|
|
* which are not COMPLETE. The FSNamesystem read lock MUST be held before
|
|
|
* calling this method.
|
|
|
- * @return
|
|
|
*/
|
|
|
synchronized long getNumUnderConstructionBlocks() {
|
|
|
assert this.fsnamesystem.hasReadLock() : "The FSNamesystem read lock wasn't"
|
|
|
+ "acquired before counting under construction blocks";
|
|
|
long numUCBlocks = 0;
|
|
|
- for (Lease lease : sortedLeases) {
|
|
|
- for (String path : lease.getPaths()) {
|
|
|
- final INodeFile cons;
|
|
|
- try {
|
|
|
- cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile();
|
|
|
- Preconditions.checkState(cons.isUnderConstruction());
|
|
|
- } catch (UnresolvedLinkException e) {
|
|
|
- throw new AssertionError("Lease files should reside on this FS");
|
|
|
- }
|
|
|
- BlockInfoContiguous[] blocks = cons.getBlocks();
|
|
|
- if(blocks == null)
|
|
|
- continue;
|
|
|
- for(BlockInfoContiguous b : blocks) {
|
|
|
- if(!b.isComplete())
|
|
|
- numUCBlocks++;
|
|
|
- }
|
|
|
+ for (Long id : getINodeIdWithLeases()) {
|
|
|
+ final INodeFile cons = fsnamesystem.getFSDirectory().getInode(id).asFile();
|
|
|
+ Preconditions.checkState(cons.isUnderConstruction());
|
|
|
+ BlockInfoContiguous[] blocks = cons.getBlocks();
|
|
|
+ if(blocks == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ for(BlockInfoContiguous b : blocks) {
|
|
|
+ if(!b.isComplete())
|
|
|
+ numUCBlocks++;
|
|
|
}
|
|
|
}
|
|
|
LOG.info("Number of blocks under construction: " + numUCBlocks);
|
|
|
return numUCBlocks;
|
|
|
}
|
|
|
|
|
|
+ Collection<Long> getINodeIdWithLeases() {return leasesById.keySet();}
|
|
|
+
|
|
|
/** @return the lease containing src */
|
|
|
- public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
|
|
|
+ public synchronized Lease getLease(INodeFile src) {return leasesById.get(src.getId());}
|
|
|
|
|
|
/** @return the number of leases currently in the system */
|
|
|
+ @VisibleForTesting
|
|
|
public synchronized int countLease() {return sortedLeases.size();}
|
|
|
|
|
|
- /** @return the number of paths contained in all leases */
|
|
|
- synchronized int countPath() {
|
|
|
- int count = 0;
|
|
|
- for(Lease lease : sortedLeases) {
|
|
|
- count += lease.getPaths().size();
|
|
|
- }
|
|
|
- return count;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Adds (or re-adds) the lease for the specified file.
|
|
|
*/
|
|
|
- synchronized Lease addLease(String holder, String src) {
|
|
|
+ synchronized Lease addLease(String holder, long inodeId) {
|
|
|
Lease lease = getLease(holder);
|
|
|
if (lease == null) {
|
|
|
lease = new Lease(holder);
|
|
@@ -159,23 +142,24 @@ public class LeaseManager {
|
|
|
} else {
|
|
|
renewLease(lease);
|
|
|
}
|
|
|
- sortedLeasesByPath.put(src, lease);
|
|
|
- lease.paths.add(src);
|
|
|
+ leasesById.put(inodeId, lease);
|
|
|
+ lease.files.add(inodeId);
|
|
|
return lease;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Remove the specified lease and src.
|
|
|
*/
|
|
|
- synchronized void removeLease(Lease lease, String src) {
|
|
|
- sortedLeasesByPath.remove(src);
|
|
|
- if (!lease.removePath(src)) {
|
|
|
+ private synchronized void removeLease(Lease lease, long inodeId) {
|
|
|
+ leasesById.remove(inodeId);
|
|
|
+ if (!lease.removeFile(inodeId)) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(src + " not found in lease.paths (=" + lease.paths + ")");
|
|
|
+ LOG.debug("inode " + inodeId + " not found in lease.files (=" + lease
|
|
|
+ + ")");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!lease.hasPath()) {
|
|
|
+ if (!lease.hasFiles()) {
|
|
|
leases.remove(lease.holder);
|
|
|
if (!sortedLeases.remove(lease)) {
|
|
|
LOG.error(lease + " not found in sortedLeases");
|
|
@@ -186,31 +170,32 @@ public class LeaseManager {
|
|
|
/**
|
|
|
* Remove the lease for the specified holder and src
|
|
|
*/
|
|
|
- synchronized void removeLease(String holder, String src) {
|
|
|
+ synchronized void removeLease(String holder, INodeFile src) {
|
|
|
Lease lease = getLease(holder);
|
|
|
if (lease != null) {
|
|
|
- removeLease(lease, src);
|
|
|
+ removeLease(lease, src.getId());
|
|
|
} else {
|
|
|
LOG.warn("Removing non-existent lease! holder=" + holder +
|
|
|
- " src=" + src);
|
|
|
+ " src=" + src.getFullPathName());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
synchronized void removeAllLeases() {
|
|
|
sortedLeases.clear();
|
|
|
- sortedLeasesByPath.clear();
|
|
|
+ leasesById.clear();
|
|
|
leases.clear();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Reassign lease for file src to the new holder.
|
|
|
*/
|
|
|
- synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
|
|
|
+ synchronized Lease reassignLease(Lease lease, INodeFile src,
|
|
|
+ String newHolder) {
|
|
|
assert newHolder != null : "new lease holder is null";
|
|
|
if (lease != null) {
|
|
|
- removeLease(lease, src);
|
|
|
+ removeLease(lease, src.getId());
|
|
|
}
|
|
|
- return addLease(newHolder, src);
|
|
|
+ return addLease(newHolder, src.getId());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -243,10 +228,10 @@ public class LeaseManager {
|
|
|
* checks in. If the client dies and allows its lease to
|
|
|
* expire, all the corresponding locks can be released.
|
|
|
*************************************************************/
|
|
|
- class Lease implements Comparable<Lease> {
|
|
|
+ class Lease {
|
|
|
private final String holder;
|
|
|
private long lastUpdate;
|
|
|
- private final Collection<String> paths = new TreeSet<String>();
|
|
|
+ private final HashSet<Long> files = new HashSet<>();
|
|
|
|
|
|
/** Only LeaseManager object can create a lease */
|
|
|
private Lease(String holder) {
|
|
@@ -269,127 +254,43 @@ public class LeaseManager {
|
|
|
}
|
|
|
|
|
|
/** Does this lease contain any path? */
|
|
|
- boolean hasPath() {return !paths.isEmpty();}
|
|
|
+ boolean hasFiles() {return !files.isEmpty();}
|
|
|
|
|
|
- boolean removePath(String src) {
|
|
|
- return paths.remove(src);
|
|
|
+ boolean removeFile(long inodeId) {
|
|
|
+ return files.remove(inodeId);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return "[Lease. Holder: " + holder
|
|
|
- + ", pendingcreates: " + paths.size() + "]";
|
|
|
+ + ", pending creates: " + files.size() + "]";
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public int compareTo(Lease o) {
|
|
|
- Lease l1 = this;
|
|
|
- Lease l2 = o;
|
|
|
- long lu1 = l1.lastUpdate;
|
|
|
- long lu2 = l2.lastUpdate;
|
|
|
- if (lu1 < lu2) {
|
|
|
- return -1;
|
|
|
- } else if (lu1 > lu2) {
|
|
|
- return 1;
|
|
|
- } else {
|
|
|
- return l1.holder.compareTo(l2.holder);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean equals(Object o) {
|
|
|
- if (!(o instanceof Lease)) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- Lease obj = (Lease) o;
|
|
|
- if (lastUpdate == obj.lastUpdate &&
|
|
|
- holder.equals(obj.holder)) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
public int hashCode() {
|
|
|
return holder.hashCode();
|
|
|
}
|
|
|
|
|
|
- Collection<String> getPaths() {
|
|
|
- return paths;
|
|
|
- }
|
|
|
+ private Collection<Long> getFiles() { return files; }
|
|
|
|
|
|
String getHolder() {
|
|
|
return holder;
|
|
|
}
|
|
|
|
|
|
- void replacePath(String oldpath, String newpath) {
|
|
|
- paths.remove(oldpath);
|
|
|
- paths.add(newpath);
|
|
|
- }
|
|
|
-
|
|
|
@VisibleForTesting
|
|
|
long getLastUpdate() {
|
|
|
return lastUpdate;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized void changeLease(String src, String dst) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(getClass().getSimpleName() + ".changelease: " +
|
|
|
- " src=" + src + ", dest=" + dst);
|
|
|
- }
|
|
|
-
|
|
|
- final int len = src.length();
|
|
|
- for(Map.Entry<String, Lease> entry
|
|
|
- : findLeaseWithPrefixPath(src, sortedLeasesByPath).entrySet()) {
|
|
|
- final String oldpath = entry.getKey();
|
|
|
- final Lease lease = entry.getValue();
|
|
|
- // replace stem of src with new destination
|
|
|
- final String newpath = dst + oldpath.substring(len);
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
|
|
|
- }
|
|
|
- lease.replacePath(oldpath, newpath);
|
|
|
- sortedLeasesByPath.remove(oldpath);
|
|
|
- sortedLeasesByPath.put(newpath, lease);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- synchronized void removeLeaseWithPrefixPath(String prefix) {
|
|
|
- for(Map.Entry<String, Lease> entry
|
|
|
- : findLeaseWithPrefixPath(prefix, sortedLeasesByPath).entrySet()) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(LeaseManager.class.getSimpleName()
|
|
|
- + ".removeLeaseWithPrefixPath: entry=" + entry);
|
|
|
- }
|
|
|
- removeLease(entry.getValue(), entry.getKey());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static private Map<String, Lease> findLeaseWithPrefixPath(
|
|
|
- String prefix, SortedMap<String, Lease> path2lease) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(LeaseManager.class.getSimpleName() + ".findLease: prefix=" + prefix);
|
|
|
- }
|
|
|
-
|
|
|
- final Map<String, Lease> entries = new HashMap<String, Lease>();
|
|
|
- int srclen = prefix.length();
|
|
|
-
|
|
|
- // prefix may ended with '/'
|
|
|
- if (prefix.charAt(srclen - 1) == Path.SEPARATOR_CHAR) {
|
|
|
- srclen -= 1;
|
|
|
- }
|
|
|
-
|
|
|
- for(Map.Entry<String, Lease> entry : path2lease.tailMap(prefix).entrySet()) {
|
|
|
- final String p = entry.getKey();
|
|
|
- if (!p.startsWith(prefix)) {
|
|
|
- return entries;
|
|
|
- }
|
|
|
- if (p.length() == srclen || p.charAt(srclen) == Path.SEPARATOR_CHAR) {
|
|
|
- entries.put(entry.getKey(), entry.getValue());
|
|
|
+ @VisibleForTesting
|
|
|
+ synchronized void removeLeases(Collection<Long> inodes) {
|
|
|
+ for (long inode : inodes) {
|
|
|
+ Lease lease = leasesById.get(inode);
|
|
|
+ if (lease != null) {
|
|
|
+ removeLease(lease, inode);
|
|
|
}
|
|
|
}
|
|
|
- return entries;
|
|
|
}
|
|
|
|
|
|
public void setLeasePeriod(long softLimit, long hardLimit) {
|
|
@@ -428,30 +329,13 @@ public class LeaseManager {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(name + " is interrupted", ie);
|
|
|
}
|
|
|
+ } catch(Throwable e) {
|
|
|
+ LOG.warn("Unexpected throwable: ", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Get the list of inodes corresponding to valid leases.
|
|
|
- * @return list of inodes
|
|
|
- */
|
|
|
- Map<String, INodeFile> getINodesUnderConstruction() {
|
|
|
- Map<String, INodeFile> inodes = new TreeMap<String, INodeFile>();
|
|
|
- for (String p : sortedLeasesByPath.keySet()) {
|
|
|
- // verify that path exists in namespace
|
|
|
- try {
|
|
|
- INodeFile node = INodeFile.valueOf(fsnamesystem.dir.getINode(p), p);
|
|
|
- Preconditions.checkState(node.isUnderConstruction());
|
|
|
- inodes.put(p, node);
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.error(ioe);
|
|
|
- }
|
|
|
- }
|
|
|
- return inodes;
|
|
|
- }
|
|
|
-
|
|
|
/** Check the leases beginning from the oldest.
|
|
|
* @return true is sync is needed.
|
|
|
*/
|
|
@@ -459,34 +343,35 @@ public class LeaseManager {
|
|
|
synchronized boolean checkLeases() {
|
|
|
boolean needSync = false;
|
|
|
assert fsnamesystem.hasWriteLock();
|
|
|
- Lease leaseToCheck = null;
|
|
|
- try {
|
|
|
- leaseToCheck = sortedLeases.first();
|
|
|
- } catch(NoSuchElementException e) {}
|
|
|
-
|
|
|
- while(leaseToCheck != null) {
|
|
|
- if (!leaseToCheck.expiredHardLimit()) {
|
|
|
- break;
|
|
|
- }
|
|
|
|
|
|
+ while(!sortedLeases.isEmpty() && sortedLeases.peek().expiredHardLimit()) {
|
|
|
+ Lease leaseToCheck = sortedLeases.poll();
|
|
|
LOG.info(leaseToCheck + " has expired hard limit");
|
|
|
|
|
|
- final List<String> removing = new ArrayList<String>();
|
|
|
- // need to create a copy of the oldest lease paths, because
|
|
|
- // internalReleaseLease() removes paths corresponding to empty files,
|
|
|
+ final List<Long> removing = new ArrayList<>();
|
|
|
+ // need to create a copy of the oldest lease files, because
|
|
|
+ // internalReleaseLease() removes files corresponding to empty files,
|
|
|
// i.e. it needs to modify the collection being iterated over
|
|
|
// causing ConcurrentModificationException
|
|
|
- String[] leasePaths = new String[leaseToCheck.getPaths().size()];
|
|
|
- leaseToCheck.getPaths().toArray(leasePaths);
|
|
|
- for(String p : leasePaths) {
|
|
|
+ Collection<Long> files = leaseToCheck.getFiles();
|
|
|
+ Long[] leaseINodeIds = files.toArray(new Long[files.size()]);
|
|
|
+ FSDirectory fsd = fsnamesystem.getFSDirectory();
|
|
|
+ String p = null;
|
|
|
+ for(Long id : leaseINodeIds) {
|
|
|
try {
|
|
|
- INodesInPath iip = fsnamesystem.getFSDirectory().getINodesInPath(p,
|
|
|
- true);
|
|
|
- boolean completed = fsnamesystem.internalReleaseLease(leaseToCheck, p,
|
|
|
- iip, HdfsServerConstants.NAMENODE_LEASE_HOLDER);
|
|
|
+ INodesInPath iip = INodesInPath.fromINode(fsd.getInode(id));
|
|
|
+ p = iip.getPath();
|
|
|
+ // Sanity check to make sure the path is correct
|
|
|
+ if (!p.startsWith("/")) {
|
|
|
+ throw new IOException("Invalid path in the lease " + p);
|
|
|
+ }
|
|
|
+ boolean completed = fsnamesystem.internalReleaseLease(
|
|
|
+ leaseToCheck, p, iip,
|
|
|
+ HdfsServerConstants.NAMENODE_LEASE_HOLDER);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
if (completed) {
|
|
|
- LOG.debug("Lease recovery for " + p + " is complete. File closed.");
|
|
|
+ LOG.debug("Lease recovery for inode " + id + " is complete. " +
|
|
|
+ "File closed.");
|
|
|
} else {
|
|
|
LOG.debug("Started block recovery " + p + " lease " + leaseToCheck);
|
|
|
}
|
|
@@ -498,22 +383,15 @@ public class LeaseManager {
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Cannot release the path " + p + " in the lease "
|
|
|
+ leaseToCheck, e);
|
|
|
- removing.add(p);
|
|
|
+ removing.add(id);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for(String p : removing) {
|
|
|
- removeLease(leaseToCheck, p);
|
|
|
+ for(Long id : removing) {
|
|
|
+ removeLease(leaseToCheck, id);
|
|
|
}
|
|
|
- leaseToCheck = sortedLeases.higher(leaseToCheck);
|
|
|
}
|
|
|
|
|
|
- try {
|
|
|
- if(leaseToCheck != sortedLeases.first()) {
|
|
|
- LOG.warn("Unable to release hard-limit expired lease: "
|
|
|
- + sortedLeases.first());
|
|
|
- }
|
|
|
- } catch(NoSuchElementException e) {}
|
|
|
return needSync;
|
|
|
}
|
|
|
|
|
@@ -522,7 +400,7 @@ public class LeaseManager {
|
|
|
return getClass().getSimpleName() + "= {"
|
|
|
+ "\n leases=" + leases
|
|
|
+ "\n sortedLeases=" + sortedLeases
|
|
|
- + "\n sortedLeasesByPath=" + sortedLeasesByPath
|
|
|
+ + "\n leasesById=" + leasesById
|
|
|
+ "\n}";
|
|
|
}
|
|
|
|
|
@@ -552,9 +430,15 @@ public class LeaseManager {
|
|
|
* its leases immediately. This is for use by unit tests.
|
|
|
*/
|
|
|
@VisibleForTesting
|
|
|
- void triggerMonitorCheckNow() {
|
|
|
+ public void triggerMonitorCheckNow() {
|
|
|
Preconditions.checkState(lmthread != null,
|
|
|
"Lease monitor is not running");
|
|
|
lmthread.interrupt();
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public void runLeaseChecks() {
|
|
|
+ checkLeases();
|
|
|
+ }
|
|
|
+
|
|
|
}
|