|
@@ -29,6 +29,7 @@ import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.commons.lang.math.LongRange;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -61,15 +62,13 @@ import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.util.StopWatch;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Charsets;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.ImmutableList;
|
|
|
-import com.google.common.collect.Range;
|
|
|
-import com.google.common.collect.Ranges;
|
|
|
import com.google.protobuf.TextFormat;
|
|
|
-import org.apache.hadoop.util.StopWatch;
|
|
|
|
|
|
/**
|
|
|
* A JournalNode can manage journals for several clusters at once.
|
|
@@ -793,8 +792,8 @@ public class Journal implements Closeable {
|
|
|
// Paranoid sanity check: if the new log is shorter than the log we
|
|
|
// currently have, we should not end up discarding any transactions
|
|
|
// which are already Committed.
|
|
|
- if (txnRange(currentSegment).contains(committedTxnId.get()) &&
|
|
|
- !txnRange(segment).contains(committedTxnId.get())) {
|
|
|
+ if (txnRange(currentSegment).containsLong(committedTxnId.get()) &&
|
|
|
+ !txnRange(segment).containsLong(committedTxnId.get())) {
|
|
|
throw new AssertionError(
|
|
|
"Cannot replace segment " +
|
|
|
TextFormat.shortDebugString(currentSegment) +
|
|
@@ -812,7 +811,7 @@ public class Journal implements Closeable {
|
|
|
|
|
|
// If we're shortening the log, update our highest txid
|
|
|
// used for lag metrics.
|
|
|
- if (txnRange(currentSegment).contains(highestWrittenTxId)) {
|
|
|
+ if (txnRange(currentSegment).containsLong(highestWrittenTxId)) {
|
|
|
highestWrittenTxId = segment.getEndTxId();
|
|
|
}
|
|
|
}
|
|
@@ -856,10 +855,10 @@ public class Journal implements Closeable {
|
|
|
TextFormat.shortDebugString(newData));
|
|
|
}
|
|
|
|
|
|
- private Range<Long> txnRange(SegmentStateProto seg) {
|
|
|
+ private LongRange txnRange(SegmentStateProto seg) {
|
|
|
Preconditions.checkArgument(seg.hasEndTxId(),
|
|
|
"invalid segment: %s", seg);
|
|
|
- return Ranges.closed(seg.getStartTxId(), seg.getEndTxId());
|
|
|
+ return new LongRange(seg.getStartTxId(), seg.getEndTxId());
|
|
|
}
|
|
|
|
|
|
/**
|