|
@@ -127,6 +127,11 @@ public class VolumeScanner extends Thread {
|
|
|
*/
|
|
|
private boolean stopping = false;
|
|
|
|
|
|
+ /**
|
|
|
+ * The monotonic minute that the volume scanner was started on.
|
|
|
+ */
|
|
|
+ private long startMinute = 0;
|
|
|
+
|
|
|
/**
|
|
|
* The current minute, in monotonic terms.
|
|
|
*/
|
|
@@ -297,18 +302,18 @@ public class VolumeScanner extends Thread {
|
|
|
private void expireOldScannedBytesRecords(long monotonicMs) {
|
|
|
long newMinute =
|
|
|
TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS);
|
|
|
- newMinute = newMinute % MINUTES_PER_HOUR;
|
|
|
if (curMinute == newMinute) {
|
|
|
return;
|
|
|
}
|
|
|
// If a minute or more has gone past since we last updated the scannedBytes
|
|
|
// array, zero out the slots corresponding to those minutes.
|
|
|
for (long m = curMinute + 1; m <= newMinute; m++) {
|
|
|
- LOG.trace("{}: updateScannedBytes is zeroing out slot {}. " +
|
|
|
- "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR,
|
|
|
- curMinute, newMinute);
|
|
|
- scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)];
|
|
|
- scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0;
|
|
|
+ int slotIdx = (int)(m % MINUTES_PER_HOUR);
|
|
|
+ LOG.trace("{}: updateScannedBytes is zeroing out slotIdx {}. " +
|
|
|
+ "curMinute = {}; newMinute = {}", this, slotIdx,
|
|
|
+ curMinute, newMinute);
|
|
|
+ scannedBytesSum -= scannedBytes[slotIdx];
|
|
|
+ scannedBytes[slotIdx] = 0;
|
|
|
}
|
|
|
curMinute = newMinute;
|
|
|
}
|
|
@@ -425,14 +430,28 @@ public class VolumeScanner extends Thread {
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
|
- static boolean calculateShouldScan(long targetBytesPerSec,
|
|
|
- long scannedBytesSum) {
|
|
|
- long effectiveBytesPerSec =
|
|
|
- scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
|
|
|
+ static boolean calculateShouldScan(String storageId, long targetBytesPerSec,
|
|
|
+ long scannedBytesSum, long startMinute, long curMinute) {
|
|
|
+ long runMinutes = curMinute - startMinute;
|
|
|
+ long effectiveBytesPerSec;
|
|
|
+ if (runMinutes <= 0) {
|
|
|
+ // avoid division by zero
|
|
|
+ effectiveBytesPerSec = scannedBytesSum;
|
|
|
+ } else {
|
|
|
+ if (runMinutes > MINUTES_PER_HOUR) {
|
|
|
+ // we only keep an hour's worth of rate information
|
|
|
+ runMinutes = MINUTES_PER_HOUR;
|
|
|
+ }
|
|
|
+ effectiveBytesPerSec = scannedBytesSum /
|
|
|
+ (SECONDS_PER_MINUTE * runMinutes);
|
|
|
+ }
|
|
|
+
|
|
|
boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec;
|
|
|
- LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " +
|
|
|
- "targetBytesPerSec = {}. shouldScan = {}",
|
|
|
- effectiveBytesPerSec, targetBytesPerSec, shouldScan);
|
|
|
+ LOG.trace("{}: calculateShouldScan: effectiveBytesPerSec = {}, and " +
|
|
|
+ "targetBytesPerSec = {}. startMinute = {}, curMinute = {}, " +
|
|
|
+ "shouldScan = {}",
|
|
|
+ storageId, effectiveBytesPerSec, targetBytesPerSec,
|
|
|
+ startMinute, curMinute, shouldScan);
|
|
|
return shouldScan;
|
|
|
}
|
|
|
|
|
@@ -450,7 +469,8 @@ public class VolumeScanner extends Thread {
|
|
|
long monotonicMs = Time.monotonicNow();
|
|
|
expireOldScannedBytesRecords(monotonicMs);
|
|
|
|
|
|
- if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) {
|
|
|
+ if (!calculateShouldScan(volume.getStorageID(), conf.targetBytesPerSec,
|
|
|
+ scannedBytesSum, startMinute, curMinute)) {
|
|
|
// If neededBytesPerSec is too low, then wait few seconds for some old
|
|
|
// scannedBytes records to expire.
|
|
|
return 30000L;
|
|
@@ -533,6 +553,10 @@ public class VolumeScanner extends Thread {
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
+ // Record the minute on which the scanner started.
|
|
|
+ this.startMinute =
|
|
|
+ TimeUnit.MINUTES.convert(Time.monotonicNow(), TimeUnit.MILLISECONDS);
|
|
|
+ this.curMinute = startMinute;
|
|
|
try {
|
|
|
LOG.trace("{}: thread starting.", this);
|
|
|
resultHandler.setup(this);
|