|
@@ -26,6 +26,7 @@ import java.util.HashMap;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -79,6 +80,8 @@ public class LeaseRenewer {
|
|
private static long leaseRenewerGraceDefault = 60*1000L;
|
|
private static long leaseRenewerGraceDefault = 60*1000L;
|
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
|
|
|
|
|
|
|
+ private AtomicBoolean isLSRunning = new AtomicBoolean(false);
|
|
|
|
+
|
|
/** Get a {@link LeaseRenewer} instance */
|
|
/** Get a {@link LeaseRenewer} instance */
|
|
public static LeaseRenewer getInstance(final String authority,
|
|
public static LeaseRenewer getInstance(final String authority,
|
|
final UserGroupInformation ugi, final DFSClient dfsc) {
|
|
final UserGroupInformation ugi, final DFSClient dfsc) {
|
|
@@ -87,6 +90,15 @@ public class LeaseRenewer {
|
|
return r;
|
|
return r;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Remove the given renewer from the Factory.
|
|
|
|
+ * Subsequent call will receive new {@link LeaseRenewer} instance.
|
|
|
|
+ * @param renewer Instance to be cleared from Factory
|
|
|
|
+ */
|
|
|
|
+ public static void remove(LeaseRenewer renewer) {
|
|
|
|
+ Factory.INSTANCE.remove(renewer);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* A factory for sharing {@link LeaseRenewer} objects
|
|
* A factory for sharing {@link LeaseRenewer} objects
|
|
* among {@link DFSClient} instances
|
|
* among {@link DFSClient} instances
|
|
@@ -156,6 +168,9 @@ public class LeaseRenewer {
|
|
final LeaseRenewer stored = renewers.get(r.factorykey);
|
|
final LeaseRenewer stored = renewers.get(r.factorykey);
|
|
//Since a renewer may expire, the stored renewer can be different.
|
|
//Since a renewer may expire, the stored renewer can be different.
|
|
if (r == stored) {
|
|
if (r == stored) {
|
|
|
|
+ // Expire LeaseRenewer daemon thread as soon as possible.
|
|
|
|
+ r.clearClients();
|
|
|
|
+ r.setEmptyTime(0);
|
|
renewers.remove(r.factorykey);
|
|
renewers.remove(r.factorykey);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -241,6 +256,10 @@ public class LeaseRenewer {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private synchronized void clearClients() {
|
|
|
|
+ dfsclients.clear();
|
|
|
|
+ }
|
|
|
|
+
|
|
private synchronized boolean clientsRunning() {
|
|
private synchronized boolean clientsRunning() {
|
|
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
|
|
for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
|
|
if (!i.next().isClientRunning()) {
|
|
if (!i.next().isClientRunning()) {
|
|
@@ -292,11 +311,18 @@ public class LeaseRenewer {
|
|
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
|
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
|
}
|
|
}
|
|
|
|
|
|
- public synchronized void put(final DFSClient dfsc) {
|
|
|
|
|
|
+ public synchronized boolean put(final DFSClient dfsc) {
|
|
if (dfsc.isClientRunning()) {
|
|
if (dfsc.isClientRunning()) {
|
|
if (!isRunning() || isRenewerExpired()) {
|
|
if (!isRunning() || isRenewerExpired()) {
|
|
- //start a new deamon with a new id.
|
|
|
|
|
|
+ // Start a new daemon with a new id.
|
|
final int id = ++currentId;
|
|
final int id = ++currentId;
|
|
|
|
+ if (isLSRunning.get()) {
|
|
|
|
+ // Not allowed to add multiple daemons into LeaseRenewer, let client
|
|
|
|
+ // create new LR and continue to acquire lease.
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ isLSRunning.getAndSet(true);
|
|
|
|
+
|
|
daemon = new Daemon(new Runnable() {
|
|
daemon = new Daemon(new Runnable() {
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
@@ -328,6 +354,7 @@ public class LeaseRenewer {
|
|
}
|
|
}
|
|
emptyTime = Long.MAX_VALUE;
|
|
emptyTime = Long.MAX_VALUE;
|
|
}
|
|
}
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -426,9 +453,6 @@ public class LeaseRenewer {
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
|
|
DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
|
|
dfsclientsCopy = new ArrayList<>(dfsclients);
|
|
dfsclientsCopy = new ArrayList<>(dfsclients);
|
|
- dfsclients.clear();
|
|
|
|
- //Expire the current LeaseRenewer thread.
|
|
|
|
- emptyTime = 0;
|
|
|
|
Factory.INSTANCE.remove(LeaseRenewer.this);
|
|
Factory.INSTANCE.remove(LeaseRenewer.this);
|
|
}
|
|
}
|
|
for (DFSClient dfsClient : dfsclientsCopy) {
|
|
for (DFSClient dfsClient : dfsclientsCopy) {
|