|
@@ -30,7 +30,7 @@ import java.util.Map;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
import org.apache.hadoop.hdfs.DFSClient;
|
|
-import org.apache.hadoop.hdfs.DFSOutputStream;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.DFSClientFaultInjector;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.Daemon;
|
|
@@ -76,7 +76,7 @@ import org.slf4j.LoggerFactory;
|
|
public class LeaseRenewer {
|
|
public class LeaseRenewer {
|
|
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
|
|
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
|
|
|
|
|
|
- static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
|
|
|
|
|
|
+ private static long leaseRenewerGraceDefault = 60*1000L;
|
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
|
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
|
|
|
|
|
|
/** Get a {@link LeaseRenewer} instance */
|
|
/** Get a {@link LeaseRenewer} instance */
|
|
@@ -156,9 +156,7 @@ public class LeaseRenewer {
|
|
final LeaseRenewer stored = renewers.get(r.factorykey);
|
|
final LeaseRenewer stored = renewers.get(r.factorykey);
|
|
//Since a renewer may expire, the stored renewer can be different.
|
|
//Since a renewer may expire, the stored renewer can be different.
|
|
if (r == stored) {
|
|
if (r == stored) {
|
|
- if (!r.clientsRunning()) {
|
|
|
|
- renewers.remove(r.factorykey);
|
|
|
|
- }
|
|
|
|
|
|
+ renewers.remove(r.factorykey);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -201,7 +199,7 @@ public class LeaseRenewer {
|
|
|
|
|
|
private LeaseRenewer(Factory.Key factorykey) {
|
|
private LeaseRenewer(Factory.Key factorykey) {
|
|
this.factorykey = factorykey;
|
|
this.factorykey = factorykey;
|
|
- unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
|
|
|
|
|
|
+ unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault);
|
|
|
|
|
|
if (LOG.isTraceEnabled()) {
|
|
if (LOG.isTraceEnabled()) {
|
|
instantiationTrace = StringUtils.stringifyException(
|
|
instantiationTrace = StringUtils.stringifyException(
|
|
@@ -293,8 +291,7 @@ public class LeaseRenewer {
|
|
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
|
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
|
}
|
|
}
|
|
|
|
|
|
- public synchronized void put(final long inodeId, final DFSOutputStream out,
|
|
|
|
- final DFSClient dfsc) {
|
|
|
|
|
|
+ public synchronized void put(final DFSClient dfsc) {
|
|
if (dfsc.isClientRunning()) {
|
|
if (dfsc.isClientRunning()) {
|
|
if (!isRunning() || isRenewerExpired()) {
|
|
if (!isRunning() || isRenewerExpired()) {
|
|
//start a new deamon with a new id.
|
|
//start a new deamon with a new id.
|
|
@@ -328,7 +325,6 @@ public class LeaseRenewer {
|
|
});
|
|
});
|
|
daemon.start();
|
|
daemon.start();
|
|
}
|
|
}
|
|
- dfsc.putFileBeingWritten(inodeId, out);
|
|
|
|
emptyTime = Long.MAX_VALUE;
|
|
emptyTime = Long.MAX_VALUE;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -338,28 +334,6 @@ public class LeaseRenewer {
|
|
emptyTime = time;
|
|
emptyTime = time;
|
|
}
|
|
}
|
|
|
|
|
|
- /** Close a file. */
|
|
|
|
- public void closeFile(final long inodeId, final DFSClient dfsc) {
|
|
|
|
- dfsc.removeFileBeingWritten(inodeId);
|
|
|
|
-
|
|
|
|
- synchronized(this) {
|
|
|
|
- if (dfsc.isFilesBeingWrittenEmpty()) {
|
|
|
|
- dfsclients.remove(dfsc);
|
|
|
|
- }
|
|
|
|
- //update emptyTime if necessary
|
|
|
|
- if (emptyTime == Long.MAX_VALUE) {
|
|
|
|
- for(DFSClient c : dfsclients) {
|
|
|
|
- if (!c.isFilesBeingWrittenEmpty()) {
|
|
|
|
- //found a non-empty file-being-written map
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- //discover the first time that all file-being-written maps are empty.
|
|
|
|
- emptyTime = Time.monotonicNow();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/** Close the given client. */
|
|
/** Close the given client. */
|
|
public synchronized void closeClient(final DFSClient dfsc) {
|
|
public synchronized void closeClient(final DFSClient dfsc) {
|
|
dfsclients.remove(dfsc);
|
|
dfsclients.remove(dfsc);
|
|
@@ -447,14 +421,17 @@ public class LeaseRenewer {
|
|
} catch (SocketTimeoutException ie) {
|
|
} catch (SocketTimeoutException ie) {
|
|
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
|
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
|
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
|
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
|
|
|
+ List<DFSClient> dfsclientsCopy;
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
- while (!dfsclients.isEmpty()) {
|
|
|
|
- DFSClient dfsClient = dfsclients.get(0);
|
|
|
|
- dfsClient.closeAllFilesBeingWritten(true);
|
|
|
|
- closeClient(dfsClient);
|
|
|
|
- }
|
|
|
|
|
|
+ DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
|
|
|
|
+ dfsclientsCopy = new ArrayList<>(dfsclients);
|
|
|
|
+ dfsclients.clear();
|
|
//Expire the current LeaseRenewer thread.
|
|
//Expire the current LeaseRenewer thread.
|
|
emptyTime = 0;
|
|
emptyTime = 0;
|
|
|
|
+ Factory.INSTANCE.remove(LeaseRenewer.this);
|
|
|
|
+ }
|
|
|
|
+ for (DFSClient dfsClient : dfsclientsCopy) {
|
|
|
|
+ dfsClient.closeAllFilesBeingWritten(true);
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
@@ -511,4 +488,10 @@ public class LeaseRenewer {
|
|
return b.append("]").toString();
|
|
return b.append("]").toString();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public static void setLeaseRenewerGraceDefault(
|
|
|
|
+ long leaseRenewerGraceDefault) {
|
|
|
|
+ LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault;
|
|
|
|
+ }
|
|
}
|
|
}
|