|
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.qjournal.client;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
|
+import org.apache.hadoop.util.StopWatch;
|
|
import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
|
import com.google.common.base.Joiner;
|
|
import com.google.common.base.Joiner;
|
|
@@ -58,6 +60,7 @@ class QuorumCall<KEY, RESULT> {
|
|
* fraction of the configured timeout for any call.
|
|
* fraction of the configured timeout for any call.
|
|
*/
|
|
*/
|
|
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
|
private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
|
|
|
|
+ private final StopWatch quorumStopWatch = new StopWatch();
|
|
|
|
|
|
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
|
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
|
|
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
|
|
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
|
|
@@ -83,6 +86,16 @@ class QuorumCall<KEY, RESULT> {
|
|
private QuorumCall() {
|
|
private QuorumCall() {
|
|
// Only instantiated from factory method above
|
|
// Only instantiated from factory method above
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void restartQuorumStopWatch() {
|
|
|
|
+ quorumStopWatch.reset().start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean shouldIncreaseQuorumTimeout(long offset, int millis) {
|
|
|
|
+ long elapsed = quorumStopWatch.now(TimeUnit.MILLISECONDS);
|
|
|
|
+ return elapsed + offset > (millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
/**
|
|
/**
|
|
* Wait for the quorum to achieve a certain number of responses.
|
|
* Wait for the quorum to achieve a certain number of responses.
|
|
@@ -110,6 +123,7 @@ class QuorumCall<KEY, RESULT> {
|
|
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
|
long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
|
|
long et = st + millis;
|
|
long et = st + millis;
|
|
while (true) {
|
|
while (true) {
|
|
|
|
+ restartQuorumStopWatch();
|
|
checkAssertionErrors();
|
|
checkAssertionErrors();
|
|
if (minResponses > 0 && countResponses() >= minResponses) return;
|
|
if (minResponses > 0 && countResponses() >= minResponses) return;
|
|
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
|
|
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
|
|
@@ -139,11 +153,21 @@ class QuorumCall<KEY, RESULT> {
|
|
}
|
|
}
|
|
long rem = et - now;
|
|
long rem = et - now;
|
|
if (rem <= 0) {
|
|
if (rem <= 0) {
|
|
- throw new TimeoutException();
|
|
|
|
|
|
+ // Increase timeout if a full GC occurred after restarting stopWatch
|
|
|
|
+ if (shouldIncreaseQuorumTimeout(0, millis)) {
|
|
|
|
+ et = et + millis;
|
|
|
|
+ } else {
|
|
|
|
+ throw new TimeoutException();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ restartQuorumStopWatch();
|
|
rem = Math.min(rem, nextLogTime - now);
|
|
rem = Math.min(rem, nextLogTime - now);
|
|
rem = Math.max(rem, 1);
|
|
rem = Math.max(rem, 1);
|
|
wait(rem);
|
|
wait(rem);
|
|
|
|
+ // Increase timeout if a full GC occurred after restarting stopWatch
|
|
|
|
+ if (shouldIncreaseQuorumTimeout(-rem, millis)) {
|
|
|
|
+ et = et + millis;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|