|
@@ -34,6 +34,10 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
|
+import org.apache.hadoop.util.Daemon;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
|
|
|
|
@@ -82,6 +86,9 @@ public class LeaseManager {
|
|
|
//
|
|
|
private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
|
|
|
|
|
|
+ private Daemon lmthread;
|
|
|
+ private volatile boolean shouldRunMonitor;
|
|
|
+
|
|
|
LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
|
|
|
|
|
|
Lease getLease(String holder) {
|
|
@@ -367,18 +374,18 @@ public class LeaseManager {
|
|
|
|
|
|
/** Check leases periodically. */
|
|
|
public void run() {
|
|
|
- for(; fsnamesystem.isRunning(); ) {
|
|
|
- fsnamesystem.writeLock();
|
|
|
+ for(; shouldRunMonitor && fsnamesystem.isRunning(); ) {
|
|
|
try {
|
|
|
- if (!fsnamesystem.isInSafeMode()) {
|
|
|
- checkLeases();
|
|
|
+ fsnamesystem.writeLockInterruptibly();
|
|
|
+ try {
|
|
|
+ if (!fsnamesystem.isInSafeMode()) {
|
|
|
+ checkLeases();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ fsnamesystem.writeUnlock();
|
|
|
}
|
|
|
- } finally {
|
|
|
- fsnamesystem.writeUnlock();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- try {
|
|
|
+
|
|
|
+
|
|
|
Thread.sleep(HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL);
|
|
|
} catch(InterruptedException ie) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -437,4 +444,36 @@ public class LeaseManager {
|
|
|
+ "\n sortedLeasesByPath=" + sortedLeasesByPath
|
|
|
+ "\n}";
|
|
|
}
|
|
|
+
|
|
|
+ void startMonitor() {
|
|
|
+ Preconditions.checkState(lmthread == null,
|
|
|
+ "Lease Monitor already running");
|
|
|
+ shouldRunMonitor = true;
|
|
|
+ lmthread = new Daemon(new Monitor());
|
|
|
+ lmthread.start();
|
|
|
+ }
|
|
|
+
|
|
|
+ void stopMonitor() {
|
|
|
+ if (lmthread != null) {
|
|
|
+ shouldRunMonitor = false;
|
|
|
+ try {
|
|
|
+ lmthread.interrupt();
|
|
|
+ lmthread.join(3000);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Encountered exception ", ie);
|
|
|
+ }
|
|
|
+ lmthread = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Trigger the currently-running Lease monitor to re-check
|
|
|
+ * its leases immediately. This is for use by unit tests.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ void triggerMonitorCheckNow() {
|
|
|
+ Preconditions.checkState(lmthread != null,
|
|
|
+ "Lease monitor is not running");
|
|
|
+ lmthread.interrupt();
|
|
|
+ }
|
|
|
}
|